diff --git a/.gitignore b/.gitignore index a09c56d..e739599 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /.idea +/.venv diff --git a/RuleFlow Studio.lnk b/bin/RuleFlow Studio.lnk similarity index 52% rename from RuleFlow Studio.lnk rename to bin/RuleFlow Studio.lnk index d227062..3220848 100644 Binary files a/RuleFlow Studio.lnk and b/bin/RuleFlow Studio.lnk differ diff --git a/bin/ruleflow.bash b/bin/ruleflow.bash new file mode 100755 index 0000000..931b46e --- /dev/null +++ b/bin/ruleflow.bash @@ -0,0 +1,3 @@ +#!/bin/bash +cd /home/isaac/repos/RuleFlow/src +uv run python -m studio.view diff --git a/bin/ruleflow.bat b/bin/ruleflow.bat new file mode 100644 index 0000000..d683f48 --- /dev/null +++ b/bin/ruleflow.bat @@ -0,0 +1,4 @@ +@echo off +cd C:\local\repos\RuleFlow\src +uv run python -m studio.view +@pause diff --git a/docs/plugin_dev_guide.md b/docs/plugin_dev_guide.md new file mode 100644 index 0000000..117dda6 --- /dev/null +++ b/docs/plugin_dev_guide.md @@ -0,0 +1,140 @@ +RuleFlow Studio Plugin Development Guide + +1. Architectural Overview + +RuleFlow Studio uses a Model-View-Controller (MVC) architecture tightly coupled with a custom reactive signal system. Plugins in RuleFlow Studio act as modular extensions that bridge the Model (engine state) and the View (Textual UI) to provide new functionalities, analysis tools, or visualizers. + +Core Components + +Model (studio.model.Model): The single source of truth. Manages the workspace, project paths, and the core Cellular Automata/Rule engine (self.flow). + +View (studio.view.EditorScreen): The UI controller built on Textual. It manages the layout, consisting of the main workspace (code editor + Plugin Panels) and the right sidebar (Plugin Controls). + +Engine (core.engine.Flow): The mathematical simulation engine. It manages the timeline of Event objects, causality graphs, and multi-way branch evaluations. + +Signals (core.signals.Signal): A synchronous, QT-like event dispatch system. Used to decouple UI interactions from engine events. + +2. The Plugin Interface Contract + +Every plugin must inherit from the studio.model.Plugin abstract base class. A plugin is instantiated once per application lifecycle and relies on the Model to inject runtime dependencies before on_initialized is called. + +Required Attributes (Injected by Model) + +self.name: A string defining the plugin's internal name. + +self.model: Access to the Model layer and the simulation engine (self.model.flow). + +self.view: Access to the Textual EditorScreen for UI modifications and notifications. + +self.cft: A callable (self.view.app.call_from_thread) used to safely update the UI from background worker threads. + +Abstract Methods to Implement + +on_initialized(self) -> None: Executed after dependencies are injected. Used for state initialization and signal connections. + +controls(self) -> Iterator[Widget]: Yields Textual widgets to populate the right-side control sidebar. + +panel(self) -> TabPane | None: Returns the primary workspace widget (a TabPane) to be displayed in the center view, or None if the plugin does not require a central visualizer. + +3. Step-by-Step Implementation Guide + +Step 1: File Structure and Imports + +Plugins must be placed in the /plugins/ directory. They are dynamically loaded at runtime. Standard Textual widgets and specific engine types should be imported as needed. + +from typing import Iterator +from textual.widgets import Collapsible, TabPane, Button, Label, Input +from textual.widget import Widget + +from studio.model import Plugin +from core.engine import FlowLangBase + + +Step 2: Class Definition and Initialization + +Create your class inheriting from Plugin. In on_initialized, define local state variables and map UI/Engine signals to internal handlers. + +class MyPlugin(Plugin): + def on_initialized(self) -> None: + self.name = 'my_plugin' + + # Internal state + self._target_metric: int = 0 + + # Connect View (UI) Signals + self.view.sig_button_pressed.connect(self._handle_button_press) + + # Connect Engine Signals + FlowLangBase.on_evolved_n.connect(self._handle_evolution) + + +Step 3: Building the Sidebar Controls + +Implement the controls() method. It is highly recommended to wrap related controls inside a Collapsible widget to maintain sidebar readability. Always assign strict id strings to interactive widgets; these are required for routing signals later. + + def controls(self) -> Iterator[Widget]: + with Collapsible(title='My Settings', collapsed=False): + yield Button('Calculate', id='my-calc-btn', variant="primary") + + self.metric_input = Input(value='10', id='my-metric-input') + self.metric_input.border_title = 'Target Metric' + yield self.metric_input + + +Step 4: Building the Main Panel + +Implement the panel() method. This defines what is rendered in the center workspace. Use Textual containers (VerticalScroll, Horizontal) to manage layout. + + def panel(self) -> TabPane | None: + self.output_label = Label("Awaiting calculation...") + + return TabPane( + self.name.title(), + self.output_label + ) + + +Step 5: Handling Signals + +Create the handlers bound in on_initialized. + +UI Routing: Use the id of the widget to route logic properly. +Thread Safety: Engine operations (like run.py processing flows) occur in separate workers. If an engine signal triggers a UI update, you must wrap the UI mutation in self.cft(). + + def _handle_button_press(self, event: Button.Pressed) -> None: + if event.button.id == 'my-calc-btn': + try: + # Read from UI state + val = int(self.metric_input.value) + self.view.notify(f"Calculation started for {val}...") + except ValueError: + self.view.notify("Invalid input type.", severity="error") + + def _handle_evolution(self, flow: FlowLangBase, steps: int) -> None: + # Flow execution happens in a worker thread. + # Safely update the Textual UI using self.cft() + + def update_ui(): + self.output_label.update(f"Evolved {steps} steps. Total events: {len(flow.events)}") + + self.cft(update_ui) + + +Step 6: Module Export + +For the dynamic loader to mount the plugin, a singleton instance of the plugin must be initialized at the bottom of the file. + +plugin = MyPlugin() + + +4. Best Practices & Guidelines + +State Management: Do not store complex engine data structures directly on the plugin unless necessary (e.g., caching a graph). Query self.model.flow directly when possible to avoid desync issues. + +Idempotent UI Updates: Because Textual's layout phase resolves via generators (yield), do not rely on on_mount lifecycle hooks inside the plugin class. Build the UI statically in controls and panel, caching widget references as instance variables (e.g., self.my_table = DataTable()) to mutate them later. + +UI Feedback: Always utilize self.view.notify(message, severity) to give users feedback upon completion or failure of a control action. + +Graceful Degradation: Engine states can reset (on_clear). Ensure your plugin listens for reset signals to wipe stale visualization data, preventing IndexError exceptions when referencing destroyed Event arrays. + +Signal Memory Leaks: While RuleFlow's custom signal system cleans up somewhat safely, ensure that your plugin does not continuously connect anonymous lambda functions to the engine without disconnecting them, which may degrade performance. diff --git a/ruleflow_studio.bat b/ruleflow_studio.bat deleted file mode 100644 index e5c4276..0000000 --- a/ruleflow_studio.bat +++ /dev/null @@ -1,3 +0,0 @@ -@echo off -cd C:\local\repos\ruleflow\src -uv run python -m studio.view \ No newline at end of file diff --git a/src/core/engine.py b/src/core/engine.py index 64ee022..c527a42 100644 --- a/src/core/engine.py +++ b/src/core/engine.py @@ -1,4 +1,4 @@ -from typing import Any, Sequence, MutableSequence, NamedTuple, Iterator +from typing import Any, Sequence, MutableSequence, NamedTuple, Iterator, cast, Self from abc import ABC, abstractmethod from dataclasses import dataclass from copy import copy @@ -337,7 +337,7 @@ class DeltaSpace(NamedTuple): # returned by Rule.apply() in a Sequence[DeltaSpa """Single application of a rule within Rule.apply().""" input_space: SpaceState # we always have this filled so that we know what spaces had what changes (if any) made output_space: Sequence[SpaceState | None] # can include many children branches - cell_deltas: Sequence[DeltaCell] # should be aligned with output_space array + cell_deltas: Sequence[DeltaCell] # should be aligned with output_space array (so branches align) def __bool__(self) -> bool: return any(self.output_space) or any(self.cell_deltas) # we check both to be as robust as possible... what if a rule does not return delta cells due to modifying but not adding or deleting? @@ -387,34 +387,47 @@ def spaces(self) -> Iterator[SpaceState]: if space is not None: yield space + @property # maybe cache this? + def spaces_with_metadata(self) -> Iterator[tuple[DeltaSpaces, DeltaSpace, SpaceState]]: + """Returns all newly created spaces along with their metadata (in the parent structure)""" + for r in self.space_deltas: + for space_delta in r.space_deltas: + for space in space_delta.output_space: + if space is not None: + yield r, space_delta, space + def __str__(self): return '[' + ', '.join(str(space) for space in self.spaces) + ']' # TODO remove this to a dedication printer -# We use these noinspections because of stupid PyCharm bug regarding slots and dataclasses that has not been fixed... -# noinspection PyDunderSlots,PyUnresolvedReferences class Flow: """The base class for a rule flow, additional behavior should be implemented by subclassing this class.""" # Signals (can be used to live update analysis objects like the causal graph) - on_evolve: Signal = Signal() - on_undo: Signal = Signal() + on_evolved_step: Signal[Self] = Signal() + on_evolved_n: Signal[Self, int] = Signal() # after all evolves + on_undone_step: Signal[Self] = Signal() + on_undone_n: Signal[Self, int] = Signal() # after all undo's + on_clear: Signal[Self] = Signal() + on_ruleset_set: Signal[Self] = Signal() def __init__(self): - self.rule_set: RuleSet = RuleSet([]) # can be changed at any time to provide a new set of rules. + self.ruleset: RuleSet = RuleSet([]) # can be changed at any time to provide a new set of rules. self.events: list[Event] = [] # defaults to empty... but nothing will work properly - self.event_index_offset: int = 0 + + # progress tracking attributes + self.n_step_progress: float = 0 # percentage of steps run by some_method_n(). def set_ruleset(self, ruleset: RuleSet) -> None: """Used to set the rule set""" - self.rule_set: RuleSet = ruleset + self.ruleset: RuleSet = ruleset + self.on_ruleset_set.emit(self) def set_initial_space(self, initial_space: Sequence[SpaceState]) -> None: """Used to set the initial space""" - self.events.insert( - 0, - Event(0, [DeltaSpaces(tuple((DeltaSpace(i, (i,), (DeltaCell((), ()),)) for i in initial_space)), None)]) # initial output space must be i as well so that next evolve() works. - ) + if not self.events: + self.events.append(cast(Event, cast(object, 0))) + self.events[0] = Event(0, [DeltaSpaces(tuple((DeltaSpace(i, (i,), (DeltaCell((), ()),)) for i in initial_space)), None)]) # initial output space must be `i` as well so that next evolve() works. for i in initial_space: for cell in i.get_all_cells(): cell.created_at = 0 @@ -422,30 +435,17 @@ def set_initial_space(self, initial_space: Sequence[SpaceState]) -> None: def clear_evolution(self) -> None: """Clear the evolution.""" del self.events[1:] + self.on_clear.emit(self) @property def current_event(self) -> Event: - try: - return self.events[self.current_event_idx] - except IndexError: # when the offset is too large or no events exist - if len(self.events) == 0: - raise IndexError('No events exist') - self.set_event_offset(0) # fix offset - return self.current_event + return self.events[-1] @property def current_event_idx(self) -> int: - return len(self.events) - 1 - self.event_index_offset - - def set_event_offset(self, offset: int, from_init: bool = False) -> None: - """Shift the current index in the flow of events""" - if offset < 0: - raise ValueError('Offset cannot be negative') - if from_init: - offset = len(self.events) - 1 - offset - self.event_index_offset = offset + return len(self.events) - 1 - def evolve(self) -> None: + def _evolve(self) -> None: """ Evolve the system by one step. This can be reimplemented by subclasses to modify behavior. As it stands, it does the following: @@ -455,7 +455,7 @@ def evolve(self) -> None: - set the applied rules (the applied rules are associated with the space states they modified) - extract all the modified space states from the applied rules and add them to the space states of the Event. """ - applied_rules: list[DeltaSpaces] = self.rule_set.apply(to_spaces=tuple(self.current_event.spaces)) + applied_rules: list[DeltaSpaces] = self.ruleset.apply(to_spaces=tuple(self.current_event.spaces)) if not any(applied_rules): # if no rules made any modifications to the spaces self.current_event.inert = True return @@ -476,22 +476,29 @@ def evolve(self) -> None: cell.destroyed_at += (current_event_idx,) # first one, of course, will be the main lineage # process causal distance to creation - min_prev: int = min((self.events[e_idx].causal_distance_to_creation for e_idx in self.current_event.causally_connected_events), default=-1) + min_prev: int = min((self.events[e_idx].causal_distance_to_creation + for e_idx in self.current_event.causally_connected_events), + default=-1) self.current_event.causal_distance_to_creation = min_prev + 1 # emit any signals - self.on_evolve.emit(self) + self.on_evolved_step.emit(self) - def evolve_n(self, n_steps: int, break_when_inert: bool = False) -> None: + def evolve(self, n_steps: int, break_when_inert: bool = False) -> None: """Evolve the system n steps.""" - while n_steps > 0: + i: int = 0 + while i < n_steps: # print(str(next(self.current_event.spaces).cells.search_buffer).replace('A', '\x1b[1;41m A \x1b[0m').replace('B', '\x1b[1;42m B \x1b[0m')) # if we want to see how the buffer changes. - self.evolve() - n_steps -= 1 + self.n_step_progress = (i + 1) / n_steps + i += 1 + self._evolve() if break_when_inert and self.current_event.inert: break - def undo(self) -> None: + # emit any signals + self.on_evolved_n.emit(self, n_steps) + + def _undo(self) -> None: """undo the last event...""" if self.current_event_idx == 0: return @@ -500,17 +507,17 @@ def undo(self) -> None: for dc in sd.cell_deltas: for cell in dc.destroyed_cells: cell.destroyed_at = tuple(i for i in cell.destroyed_at if i != self.current_event_idx) - if self.event_index_offset: # shift the created_at up if we are in the middle of an evolution - for cell in dc.new_cells: - cell.created_at = current_event_idx + 1 - self.events.pop(self.current_event_idx) + self.events.pop() # emit any signals - self.on_undo.emit(self) + self.on_undone_step.emit(self) - def undo_n(self, n_steps: int) -> None: + def undo(self, n_steps: int) -> None: for _ in range(n_steps): - self.undo() + self.n_step_progress = (_ + 1) / n_steps + self._undo() + + self.on_undone_n.emit(self, n_steps) def __str__(self) -> str: return '\n'.join(str(e) for e in self.events) diff --git a/src/core/explorer.py b/src/core/explorer.py deleted file mode 100644 index 6918bf2..0000000 --- a/src/core/explorer.py +++ /dev/null @@ -1,254 +0,0 @@ -""" -explorer.py - -Rich-based visualization wrapper for the Flow engine. -Designed to be agnostic between CLI (Rich Console) and TUI (Textual) environments. -""" - -from typing import Any, Optional, Dict, Iterator, List -from rich.console import Console, Style -from rich.table import Table -from rich.text import Text -from rich.live import Live -from rich import box -from rich.color import Color - -# Handle import based on your structure -from src.core.engine import Flow, Event, Cell - - - -class StyleManager: - """ - Manages the mapping between Cell quanta and Rich styles. - """ - - def __init__(self, custom_map: Optional[Dict[Any, str]] = None): - # Default colors (backgrounds used to support both text and block modes) - Style() - self._color_map: Dict[Any, str] = custom_map or { - 'A': "bold white on red", - 'B': "bold white on green", - 'C': "bold white on blue", - } - - # A palette of background styles for auto-assignment - self._palette = [ - "on cyan", "on magenta", "on yellow", "on bright_black", - "on bright_red", "on bright_green", "on bright_blue" - ] - self._ptr = 0 - - def register(self, quanta: Any, style: str) -> None: - """Manually register a style.""" - self._color_map[quanta] = style - - def get_style(self, quanta: Any) -> str: - """Get style for quanta, assigning a new one from palette if unseen.""" - key = str(quanta) - if key not in self._color_map: - # Assign next color in palette - style = self._palette[self._ptr % len(self._palette)] - self._color_map[key] = style - self._ptr += 1 - return self._color_map[key] - - -class FlowExplorerRich: - """ - Visualization engine for Flow. - """ - def __init__(self, - flow: Flow, - console: Optional[Console] = None, - block_mode: bool = False): - """ - Args: - flow: The simulation engine instance. - console: Optional Rich console (created automatically if None). - block_mode: If True, renders cells as empty colored squares (2 spaces). - """ - self.flow = flow - self.console = console or Console(force_terminal=True) - self.styler = StyleManager() - self.block_mode = block_mode - self._table: Optional[Table] = None - - def _render_cell(self, cell: Cell) -> Text: - """ - Renders a single cell. - """ - style = self.styler.get_style(cell.quanta) - - if self.block_mode: - return Text(" ", style=style) - else: - return Text(f" {cell.quanta} ", style=style) - - def _render_space(self, space_state) -> Text: - """Renders a full space (line of cells).""" - text_builder = Text() - cells = getattr(space_state, 'cells', space_state) - for cell in cells: - text_builder.append(self._render_cell(cell)) - return text_builder - - def _setup_table_structure(self, - title: Optional[str] = None, - box_style: box.Box = box.ROUNDED, - show_header: bool = True, - padding: tuple[int, int] = (0, 1), - show_idx: bool = True, - show_causal_dist: bool = False, - show_connected: bool = False) -> Table: - """ - Creates the empty table definition with customizable visuals. - """ - table = Table( - title=title, - box=box_style, - show_header=show_header, - show_edge=True, - padding=padding, - collapse_padding=True, - pad_edge=False - ) - - if show_idx: - table.add_column("Idx", justify="right", style="dim", no_wrap=True) - - table.add_column("State", ratio=1) - - if show_causal_dist: - table.add_column("Dist", justify="center", style="italic") - - if show_connected: - table.add_column("Causes", style="dim", overflow="fold") - - return table - - def render_event_row(self, - idx: int, - event: Event, - show_idx: bool = True, - show_causal_dist: bool = False, - show_connected: bool = False) -> List[Any]: - """ - Generates the raw data for a single row. - """ - row_data = [] - if show_idx: - row_data.append(str(idx)) - - try: - spaces = list(event.spaces) if isinstance(event.spaces, Iterator) else event.spaces - except TypeError: - spaces = [event.spaces] - - rendered_spaces = [self._render_space(s) for s in spaces] - - if len(rendered_spaces) == 1: - row_data.append(rendered_spaces[0]) - else: - row_data.append(Text("\n").join(rendered_spaces)) - - if show_causal_dist: - dist = getattr(event, 'causal_distance_to_creation', '-') - row_data.append(str(dist)) - - if show_connected: - conn = getattr(event, 'causally_connected_events', []) - row_data.append(str(set(conn)) if conn else "-") - - return row_data - - def get_table(self, - title: Optional[str] = None, - box_style: box.Box = box.ROUNDED, - show_header: bool = True, - show_idx: bool = True, - show_causal_dist: bool = False, - show_connected: bool = False) -> Table: - """ - Returns a fully populated Rich Table. - """ - table = self._setup_table_structure( - title=title, - box_style=box_style, - show_header=show_header, - show_idx=show_idx, - show_causal_dist=show_causal_dist, - show_connected=show_connected - ) - - for i, event in enumerate(self.flow.events): - row_data = self.render_event_row(i, event, show_idx, show_causal_dist, show_connected) - table.add_row(*row_data) - - return table - - def explore(self, - show_idx: bool = True, - show_causal_dist: bool = False, - show_connected: bool = False, - title: Optional[str] = None, - box_style: box.Box = box.ROUNDED, - live: bool = False, - refresh_rate: float = 4.0): - """ - CLI Entry point. - """ - self._table = self._setup_table_structure( - title=title, - box_style=box_style, - show_idx=show_idx, - show_causal_dist=show_causal_dist, - show_connected=show_connected - ) - - def add_rows_to_table(): - current_row_count = self._table.row_count - total_events = len(self.flow.events) - - for i in range(current_row_count, total_events): - event = self.flow.events[i] - row = self.render_event_row(i, event, show_idx, show_causal_dist, show_connected) - self._table.add_row(*row) - - if not live: - add_rows_to_table() - self.console.print(self._table) - else: - with Live(self._table, console=self.console, refresh_per_second=refresh_rate) as live_view: - self._table = self._setup_table_structure(title, box_style, True, (0,1), show_idx, show_causal_dist, show_connected) - live_view.update(self._table) - - for i, event in enumerate(self.flow.events): - row = self.render_event_row(i, event, show_idx, show_causal_dist, show_connected) - self._table.add_row(*row) - - -class FlowExplorerTextual: - pass - - -if __name__ == "__main__": - from src.implementations.sss import SSS - - # 1. Run your simulation - system = SSS(rule_set=["ABA -> AAB", "A -> ABA"], initial_space='A') - system.evolve_n(10) - - # 2. Visualize - viz = FlowExplorerRich(system, block_mode=False) - - # Optional: Custom colors (defaults are Red for A, Green for B) - # viz.set_color('A', 'bold red on red') - # viz.set_color('B', 'bold green on green') - - # 3. Render - viz.explore( - show_idx=True, - show_causal_dist=True, - show_connected=True - ) diff --git a/src/core/graph.py b/src/core/graph.py index 76d79c1..fcc7211 100644 --- a/src/core/graph.py +++ b/src/core/graph.py @@ -1,67 +1,33 @@ -"""Provides all the tools for optimized and rigorous graph analysis.""" -from core.engine import Flow -from networkx import MultiDiGraph, write_gexf -from pyvis.network import Network - - -class CausalGraph(MultiDiGraph): - def __init__(self, flow: Flow) -> None: - super().__init__() - self.flow: Flow = flow - - # TODO - maybe add option to collapse edges. - # TODO: make it so that it can be dynamically adjusted as Flow evolves. - # construct causal graph - because each node is literally the time, and thus index, it can be used to query to the actual event for more granular information. - for event in flow.events: - # if event.time == 0: continue # Skip the initial event (no parents) - self.add_node(event.time, label=f'{event.time}', title=f'Causal Distance: {event.causal_distance_to_creation}\nSpace: {event}', shape='box') # add the node - for parent_time in event.causally_connected_events: - if parent_time is not None: - self.add_edge(parent_time, event.time) +"""Provides all the tools for optimized and rigorous graph analysis. - @property - def adjacency_matrix(self): - return None +FRAMEWORK NOTES: +- use pyvis Network to render interactive graphs. - @property - def dijkstra_algorithm(self): - return None - - # same as the the glxl file... - def save_to_gephi_file(self, path: str) -> None: - """Save to the GLXL file format""" - write_gexf(self, path) - - def render_in_browser(self, filename: str = "causal_network.html", - show_controls: list[str] | bool | None = None, - shape: str = 'box', - show_state: bool = False, - show_label: bool = True, - show_distance: bool = True, - collapse_edges: bool = False): - """ - Uses pyvis to render the causal network from a Flow object. - """ +TODO: +- Redesign to support live graph updating (to keep up to date with flow). +- Add more tools for seamless analysis and integrations. +""" +from core.engine import Flow +from networkx import MultiDiGraph +from typing import Sequence, Self - net = Network(height="800px", width="100%", directed=True, filter_menu=True, select_menu=True, cdn_resources='remote') - net.from_nx(self) - if show_controls: - net.show_buttons(filter_=show_controls) - else: - net.set_options(""" - { - "physics": { - "minVelocity": 0.75 - }, - "interaction": { - "navigationButtons": true - } - } - """) - try: - net.save_graph(filename) - print(f"Successfully generated '{filename}'!") - print("Open this file in your browser to view the interactive graph.") - except Exception as e: - print(f"Error generating graph: {e}") +class EventCausalityGraph(MultiDiGraph): + def build(self, flow: Flow, + event_range: tuple[int, int, int], + collapse_multi_edges: bool = False) -> Self: + # construct causal graph - because each node is literally the time, and thus index, it can be used to query to the actual event for more granular information. + connected_container: type[tuple] | type[set] = set if collapse_multi_edges else tuple + for event in flow.events[event_range[0]:event_range[1]+1:event_range[2]]: + causally_connected: Sequence[int] = connected_container(event.causally_connected_events) + self.add_node( + event.time, + # these get passed on to the nodes of VisJS network. + label=f'{event.time}', + title=f' Causal Distance: {event.causal_distance_to_creation}\n' + f'Connected Events: {len(causally_connected)}', + shape='box' + ) + for parent_time in causally_connected: + self.add_edge(parent_time, event.time) + return self diff --git a/src/lang/numerical_helpers.py b/src/core/numlib.py similarity index 98% rename from src/lang/numerical_helpers.py rename to src/core/numlib.py index 133179a..05e8d0f 100644 --- a/src/lang/numerical_helpers.py +++ b/src/core/numlib.py @@ -182,6 +182,10 @@ def str_to_num(num: str) -> int | float: return float(num) +def is_infinity(num: int | Inf) -> bool: + return isinstance(num, Inf) + + if __name__ == '__main__': a = [1, 2, 3] print(a[-INF:1]) diff --git a/src/core/prettier.py b/src/core/prettier.py new file mode 100644 index 0000000..231c862 --- /dev/null +++ b/src/core/prettier.py @@ -0,0 +1,101 @@ +""" +prettier.py +Rich-based visualization wrapper for the Flow engine's SpaceState. + +TODO: +- Add options for Nd rendering. +""" +from string import ascii_uppercase, ascii_lowercase, digits +from typing import Iterator +from rich.text import Text +from core.engine import SpaceState + + +COLOR_PALETTE: list[str] = [ + '#1a4e8b', '#8b0000', '#2d8b2d', '#8b1a72', '#00728b', '#8b5e1a', '#4e1a8b', '#6b8b1a', '#8b1a43', '#1a7e8b', + '#7b8b1a', '#5c1a8b', '#1a8b43', '#8b431a', '#1a2d8b', '#8b7b1a', '#8b1a62', '#1a8b7e', '#3c1a8b', '#3d8b1a', + '#8b221a', '#1a5c8b', '#1a8b22', '#6e1a8b', '#5a8b1a', '#8b1a82', '#1a8b51', '#8b691a', '#2d1a8b', '#4e8b1a', + '#8b1a33', '#1a788b', '#7e8b1a', '#471a8b', '#1a8b34', '#8b3d1a', '#1a3d8b', '#628b1a', '#8b1a6e', '#1a648b', + '#758b1a', '#221a8b', '#1a8b4e', '#8b511a', '#348b1a', '#511a8b', '#1a8b6e', '#8b1a22', '#1a4e8b', '#5a8b1a', + '#6e1a8b', '#7b8b1a', '#1a8b3d', '#8b2d1a', '#1a228b', '#6b8b1a', '#8b1a7b', '#1a7b8b', '#838b1a', '#431a8b', + '#228b1a', '#8b1a4e', '#64676E' +] # 63 colors for ascii_uppercase + ascii_lowercase + digits + '_' + + +class SpaceStateStringFormatter: + def __init__(self) -> None: + # We have these here for maximal configuration. + self.default_colors = COLOR_PALETTE.copy() + self.chars = ascii_uppercase + ascii_lowercase + digits + '_' + + # this holds pre-rendered Text objects for every character + self._rich_mapping: dict[str, Text] = {} + self.cell_id_style: str = 'on black' + + # initial build + self.config() + + def config(self, styling: bool = True, + default_style_to_symbol: bool = False, + show_symbols: bool = True, + cell_padding: bool = True, + style_mapping_override: dict[str, str] = None, + clear_default_styles_on_override: bool = False, + symbol_mapping_override: dict[str, str] = None) -> None: + """ + Pre-computes the Text objects for the mapping. + All logic is handled here so __call__ is a pure lookup. + """ + if style_mapping_override is None: style_mapping_override = {} + if symbol_mapping_override is None: symbol_mapping_override = {} + + new_mapping = {} + for i, char in enumerate(self.chars): + # determine Display Symbol + display = symbol_mapping_override.get(char, char) if show_symbols else "" + # apply Padding + content = f" {display} " if cell_padding else display + # determine Style + style = "" + if styling: + default_style = self.default_colors[i] if default_style_to_symbol else f'on {self.default_colors[i]}' + if clear_default_styles_on_override and style_mapping_override: + default_style = '' + style = style_mapping_override.get(char, default_style) + # pre-render and cache the Text object + new_mapping[char] = Text(content, style=style, end='') + + new_mapping["\n"] = Text("\n", end='') # CRITICAL: Cache the newline so __call__ doesn't create objects for it + self._rich_mapping = new_mapping + + def __call__(self, s: SpaceState, highlight_cells_with_id: frozenset[int] = frozenset()) -> Text: + """Fast join using the pre-computed mapping. Also highlight specific cells matching highlight_cells_with_id.""" + rm = self._rich_mapping + cell_id_style = self.cell_id_style + def iter_cells() -> Iterator[Text]: + # noinspection PyUnresolvedReferences + for c in s.cells: + cell = rm.get(str(c), Text(str(c), end='')) + if id(c) in highlight_cells_with_id: + cell = cell.copy() + cell.stylize(cell_id_style) + yield cell + return Text(end='').join(iter_cells()) + + def convert_pure_str(self, string: str) -> Text: + """Utility method in case a given string needs to be styles the same as the space states (can be used in a ruleset printer for instance).""" + rm = self._rich_mapping + return Text(end='').join(rm.get(str(c), Text(str(c), end='')) for c in string) + +if __name__ == "__main__": + from implementations.sss import SSS + from rich.console import Console + + # run your simulation + system = SSS(rule_set=["ABA -> AAB", "A -> ABA"], initial_space='AB' + ascii_uppercase + ascii_lowercase + digits) + system.evolve(0) + formatter = SpaceStateStringFormatter() + formatter.config(show_symbols=True) + console = Console(width=1000) + for event in system.events: + console.print(formatter(event.spaces.__next__())) diff --git a/src/core/signals.py b/src/core/signals.py index 6771267..69728ce 100644 --- a/src/core/signals.py +++ b/src/core/signals.py @@ -1,52 +1,52 @@ -from typing import Callable, Hashable, Any +from typing import Callable, Any +from inspect import signature -class Signal: - """Implements a QT-like signal system with instance-specific filtering. +class Signal[*T]: + """Implements a QT-like signal system for interactive programming. - Please note that any connected functions will KEEP result in them staying alive in memory until disconnected (so make sure to disconnect any methods before deleting and object). + For convenience, the emitter does not force the connected callable to take all the arguments that are emitted. + Thus, even if the emitter includes 2 arguments, but the callee only expects 1 or 0, the *args will be truncated. + If, however, there are more than the expected number of arguments, an error will occur as expected. + + Please note that any connected functions/methods will result in them (or their objects because methods are bound + and thus ephemeral) staying ALIVE in memory until disconnected, so make sure to disconnect any methods before + deleting an object. + + If you want to implement signals globally, define them on the class space. Conversely, if you want signals to + be tied to a certain classes instances, define them as attributes. It may be useful to do both, hence you would + prefix or suffix the signal names when defining them in both the instance and class spaces. Alternatively, you + could also pass `self` when emitting and let the client decide what to do based on that. """ - __slots__ = ('callables', 'restricted_callables') + __slots__ = ('callables',) def __init__(self) -> None: - self.callables: list[Callable] = [] - # Maps a specific instance to a list of callbacks intended only for it (the caller must pass the hashable as args[0]) - self.restricted_callables: dict[Hashable, list[Callable]] = {} + self.callables: list[tuple[Callable[[*T], Any], int]] = [] @property def callables_count(self) -> int: - return len(self.callables) + sum(len(v) for v in self.restricted_callables.values()) - - def emit(self, *args: Any, **kwargs: Any) -> None: - # Standard global callbacks - for c in self.callables: - c(*args, **kwargs) - - # Instance-restricted callbacks - try: - for c in self.restricted_callables[args[0]]: - c(*args, **kwargs) - except (TypeError, # in the event that args[0] is not hashable... must use try-catch due to fake hash being detected on certain objects (like RuleMatch) that contain un-hashables. - KeyError, # if the args[0] does not exist in the dictionary - IndexError): # if the index 0 does not exist in args. - pass - - def connect(self, func: Callable, restrict_to_instance: Hashable = None) -> None: - if restrict_to_instance is not None: - callbacks = self.restricted_callables.setdefault(restrict_to_instance, []) - if func not in callbacks: - callbacks.append(func) - else: - if func not in self.callables: - self.callables.append(func) - - def disconnect(self, func: Callable, restrict_to_instance: Hashable = None) -> None: - try: - if restrict_to_instance is not None: - self.restricted_callables[restrict_to_instance].remove(func) - if not self.restricted_callables[restrict_to_instance]: - del self.restricted_callables[restrict_to_instance] - else: - self.callables.remove(func) - except (ValueError, KeyError): - pass + return len(self.callables) + + def emit(self, *args: *T) -> None: + for c, arg_len in self.callables: + c(*args[:arg_len]) + + def connect(self, c: Callable[..., Any]) -> None: + if c not in self.callables: + self.callables.append((c, len(signature(c).parameters))) + + def disconnect(self, c: Callable[..., Any]) -> None: + matches: list[int] = [i for i, (c_, _) in enumerate(self.callables) if c_ is c] + for i in reversed(matches): + self.callables.pop(i) + + +if __name__ == "__main__": + t: Signal[str, str] = Signal() + t1: Signal = Signal() + t1.emit() # highlighting bug... + f1 = lambda a, b: print(a) + f2 = lambda a, b, c: print(a, b) + t.connect(f1) + t.connect(f2) + t.emit("yup", "") diff --git a/src/core/vec.py b/src/core/vec.py index ccf08f5..8747668 100644 --- a/src/core/vec.py +++ b/src/core/vec.py @@ -198,6 +198,10 @@ def __copy__(self): def __deepcopy__(self, memo): # force it to use self.branch for safety return self.branch() + def refresh_search_buffer(self): + """Must be called if you want to refresh a dirty search buffer.""" + self.search_buffer = bytearray((ord(c.quanta) for c in self.vec)) + # ================ Viewer Methods ================ def __len__(self): return len(self.vec) @@ -288,7 +292,7 @@ def branch(self) -> TrieVec: nv: TrieVec = object.__new__(TrieVec) nv.vec = self.vec # we don't need to copy as edit() will do that for us nv.evolver = None - nv.search_buffer = self.search_buffer # note: becomes out-of-date on self after branch (use branch_search_buffer(rfp=True) to reconstruct clean buffer for self) + nv.search_buffer = self.search_buffer # note: becomes dirty on self after branch # we could auto enter edit mode here... however, that is not necessary as this should work just fine because it is auto entered upon edits. return nv diff --git a/src/implementations/__init__.py b/src/implementations/__init__.py index e69de29..6061f83 100644 --- a/src/implementations/__init__.py +++ b/src/implementations/__init__.py @@ -0,0 +1,5 @@ +"""This module provides core implementations of specific types of systems such as sequential substitution systems +or cellular automata. It uses the core tools and constructs (strictly) to provide implementations for such systems. +A system inherits from the Flow object and utilizes all the various constructs such RuleMatches and Cells to +achieve desired behavior. +""" diff --git a/src/implementations/sss.py b/src/implementations/sss.py index abcd566..50b2b8b 100644 --- a/src/implementations/sss.py +++ b/src/implementations/sss.py @@ -43,7 +43,7 @@ def __init__(self, rule_set: list[str], initial_space: str): if __name__ == "__main__": sss = SSS(["ABA -> AAB", "A -> ABA"], "AB") - sss.evolve_n(20) + sss.evolve(20) print(sss) # from core.graph import CausalGraph # g = CausalGraph(sss) diff --git a/src/integrations/__init__.py b/src/integrations/__init__.py index e69de29..f10487c 100644 --- a/src/integrations/__init__.py +++ b/src/integrations/__init__.py @@ -0,0 +1,3 @@ +"""This module is for providing integrations with external tools or frameworks. Examples could be Gephi, Wolfram Alpha, +Manim, and many other tools. It provides various tools and constructs for such integrations. +It can also provide other tools such as our own enumeration implementations, icat tools, etc.""" diff --git a/src/core/enumerator.py b/src/integrations/enumerator.py similarity index 96% rename from src/core/enumerator.py rename to src/integrations/enumerator.py index cbc782d..79be83c 100644 --- a/src/core/enumerator.py +++ b/src/integrations/enumerator.py @@ -1,3 +1,7 @@ +""" +This code currently contains rudimentary enumeration decoder tools for rulesets. Later we will want to +generalize this into framework of tools for this type of tooling. +""" import math diff --git a/src/integrations/icat/__init__.py b/src/integrations/icat/__init__.py new file mode 100644 index 0000000..b0e7500 --- /dev/null +++ b/src/integrations/icat/__init__.py @@ -0,0 +1,4 @@ +""" +Provides useful tools as a result of the I-Cat research. This module will likely be separated into its own framework +outside RuleFlow. +""" diff --git a/src/integrations/icat/core.py b/src/integrations/icat/core.py new file mode 100644 index 0000000..a972688 --- /dev/null +++ b/src/integrations/icat/core.py @@ -0,0 +1,9 @@ +class icat: + def __init__(self): + pass + + def __add__(self, other): + pass + + def __sub__(self, other): + pass diff --git a/src/lang/implementation.py b/src/lang/implementation.py index 2bc13d5..95a9f88 100644 --- a/src/lang/implementation.py +++ b/src/lang/implementation.py @@ -1,17 +1,17 @@ """The implementation for 1D space that supports the language features. Policy: -- Any multiways should have the search_buffer optimization disabled (Vec.enable_search_buffer(False)) so that it doesn't +- Any multi-ways should have the search_buffer optimization disabled (Vec.enable_search_buffer(False)) so that it doesn't become corrupt when branching (one state spawning two states). We have considered coding buffer branching logic... -however, that does not cover everything as Engine.RuleSet.apply() with group_break=False will not branch the buffer. +however, that does not cover everything as Engine.RuleSet.apply() with group_break=False will not branch the buffer (forgot why). In the future, we may consider making the search_buffer branch-able (really, it is already possible by manually using Vec.search_buffer.copy()). Future Considerations: - We will need to create different implementations for higher dimensions spaces. """ -from typing import Sequence, NamedTuple, Literal, cast, Iterator +from typing import Sequence, NamedTuple, Literal, cast, Iterator, Self from copy import deepcopy, copy -from lang.numerical_helpers import INF +from core.numlib import INF from core.signals import Signal from core.engine import ( SpaceState1D as SpaceState, @@ -36,12 +36,12 @@ class Target(NamedTuple): class BaseRule(RuleABC): # ======== Signals ======== # NOTE: time.sleep() can be used by the client to pause flow execution temporally (or play notes, etc.). - on_applied: Signal = Signal() # if the apply() function was called. The modified spaces is passed as Sequence[DeltaSpace] so that the client can test if the rule was effective. + on_applied: Signal[Self, Sequence[DeltaSpace]] = Signal() # if the apply() function was called. The modified spaces are passed as Sequence[DeltaSpace] so that the client can test if the rule was effective. # the three following rules get the RuleMatch along with idx of the current match passed as arguments to the client. - on_execution: Signal = Signal() - on_branch: Signal = Signal() - on_conflict: Signal = Signal() + on_execution: Signal[Self, RuleMatch, int] = Signal() + on_branch: Signal[Self, RuleMatch, int] = Signal() + on_conflict: Signal[Self, RuleMatch, int] = Signal() FLAG_ALIAS: dict[str, str] = { # IMPORTANT!!!: these must be kept up-to-date with the actual attributes. @@ -72,7 +72,7 @@ class BaseRule(RuleABC): def __init__(self, selector: Sequence[Selector], target: Sequence[Target]): super().__init__() # Functionality Fields (you can have multiple selectors and multiple targets) - self.selectors: Sequence[Selector] = selector # used by self.match() + self.selector: Sequence[Selector] = selector # used by self.match() self.target: Sequence[Target] = target # used by self.apply() # Complex Functionality @@ -106,9 +106,8 @@ def __init__(self, selector: Sequence[Selector], target: Sequence[Target]): # Note that additional flags can be set in the syntax, however, they will have no meaning unless included in the control flow by subclassing and modifying particular rule. - def __repr__(self): - return f"{self.__class__.__name__}({[s.selector for s in self.selectors]}, {[t.target for t in self.target]})" + return f"{self.__class__.__name__}({[s.selector for s in self.selector]}, {[t.target for t in self.target]})" def _conflict_detector(self, current_matches: list[tuple[int, int]], match: tuple[int, int]) -> set[int]: """helper that detects collisions between selectors""" @@ -145,7 +144,7 @@ def match(self, spaces: Sequence[SpaceState]) -> Sequence[RuleMatch]: for self in top_self.chain: if self.disabled: # we must check if the rule has been disabled in case the rule is in a chain (has been merged) continue - for pattern in self.selectors: + for pattern in self.selector: finds: Iterator[tuple[int, int]] if pattern.type in ('literal', 'regex'): # finds = space.find(tuple(Cell(c) for c in pattern.selector)) # older slow way (before Vec containers) diff --git a/src/lang/interpreter.py b/src/lang/interpreter.py index 6b54ada..e4d6d31 100644 --- a/src/lang/interpreter.py +++ b/src/lang/interpreter.py @@ -122,19 +122,30 @@ def interpret_directives(objects: dict[str, Any], directives: list[tuple[str, An return returns -# TODO: decouple the self.events from the compilation phase so that rulesets can be changed without affecting the current evolution!!! class FlowLangBase(Flow): """The general API of the Flow object used in all language implementations.""" - def interpret(self, s: str) -> None: - """Should set the current ruleset and initial space based on interpreted string. Also, handle directives.""" - raise NotImplementedError() - def interpret_file(self, path: str) -> None: """opens `.flow` files and constructs a FlowLang object.""" with open(path, 'r') as f: return self.interpret(f.read()) + def interpret(self, s: str) -> None: + """Should set the current ruleset and initial space based on interpreted string. Also, handle directives.""" + raise NotImplementedError() + + def undo(self, n_steps: int) -> None: + super().undo(n_steps) + for space in self.current_event.spaces: # we must remember to refresh the search buffer if undoing anything... + # noinspection PyUnresolvedReferences + space.cells.refresh_search_buffer() + + def clear_evolution(self) -> None: + super().clear_evolution() + for space in self.current_event.spaces: # we must remember to refresh the search buffer if clearing anything... + # noinspection PyUnresolvedReferences + space.cells.refresh_search_buffer() + class FlowLang(FlowLangBase): """The main interpreter object, it is what actually runs any given code.""" @@ -167,18 +178,21 @@ def interpret(self, s: str) -> None: ) )) Vec: type[vec.Vec] = getattr(vec, r.get('mem', vec.Vec.__name__)) # this is the vector we use (vec.Vec is the default) - self.set_initial_space([SpaceState(Vec([Cell(s) for s in string])) for string in r['init']]) + if not self.events: + self.set_initial_space([SpaceState(Vec([Cell(s) for s in string])) for string in r['init']]) # after instantiation interpret_directives({ - 'evolve': self.evolve_n, + 'evolve': self.evolve, + 'undo': self.undo, + 'clear': self.clear_evolution, 'merge': self.__merge_group, 'compress': self.__compress_group }, self.ast['directives']) def __merge_group(self, identifier: int | str): """A directive to merge a particular group into a chain (a composite rule)""" - rules: list[BaseRule] = cast(list[BaseRule], self.rule_set.rules) + rules: list[BaseRule] = cast(list[BaseRule], self.ruleset.rules) for i in range(len(rules)): if rules[i].disabled: continue @@ -191,8 +205,8 @@ def __merge_group(self, identifier: int | str): break def __compress_group(self, identifier: int | str): - """Compress a Rule Group such that causality is preserved (no cellular change if the characters look the same)""" - rules: list[BaseRule] = [rule for rule in cast(list[BaseRule], self.rule_set.rules) + """A directive to compress a Rule Group such that causality is preserved (no cellular change if the characters look the same)""" + rules: list[BaseRule] = [rule for rule in cast(list[BaseRule], self.ruleset.rules) if rule.group == identifier and not rule.disabled] # If any rule makes no changes, disable it. for rule in rules: @@ -200,7 +214,7 @@ def __compress_group(self, identifier: int | str): continue rule_is_active: bool = False for target in rule.target: - for selector in rule.selectors: + for selector in rule.selector: for s_char, t_char in zip(selector.selector, target.target): # we only care about the first/primary target... (we can't determine how multiple targets will behave on different match sets) if t_char.quanta == '_': continue @@ -216,7 +230,6 @@ def __compress_group(self, identifier: int | str): import gc import timeit - def get_mem(): """Returns current resident set size in MB.""" process = psutil.Process(os.getpid()) @@ -238,7 +251,7 @@ def get_mem(): // A -> ACB; """ flow = FlowLang(code) - time = timeit.timeit(lambda: flow.evolve_n(18), number=1) + time = timeit.timeit(lambda: flow.evolve(18), number=1) mem_end = get_mem() print(f"Total Memory of evolution: {mem_end - mem_start:.2f} MB") diff --git a/src/lang/parser.py b/src/lang/parser.py index 74ac6e9..6e1b1c8 100644 --- a/src/lang/parser.py +++ b/src/lang/parser.py @@ -1,6 +1,6 @@ -from core import enumerator +from integrations import enumerator from lark import Lark, Transformer -from lang.numerical_helpers import str_to_num, INF +from core.numlib import str_to_num, INF from typing import Any, cast diff --git a/src/studio_plugins/README.md b/src/studio-plugins/README.md similarity index 100% rename from src/studio_plugins/README.md rename to src/studio-plugins/README.md diff --git a/src/__init__.py b/src/studio-plugins/gephi/gephi-live.py similarity index 100% rename from src/__init__.py rename to src/studio-plugins/gephi/gephi-live.py diff --git a/src/studio_plugins/_llm_module.py b/src/studio-plugins/llm/_llm_module.py similarity index 100% rename from src/studio_plugins/_llm_module.py rename to src/studio-plugins/llm/_llm_module.py diff --git a/src/studio/stdplgns/__init__.py b/src/studio-plugins/sessie/enum.py similarity index 100% rename from src/studio/stdplgns/__init__.py rename to src/studio-plugins/sessie/enum.py diff --git a/src/studio-plugins/tunes.py b/src/studio-plugins/tunes.py new file mode 100644 index 0000000..68d0d1e --- /dev/null +++ b/src/studio-plugins/tunes.py @@ -0,0 +1,393 @@ +""" +This plugin provides auditory exploration of cellular systems using the SCAMP library. +It translates spatial cell configurations into musical chords and arpeggios, and allows +for sheet music transcription. + +Note: https://www.soundslice.com/ is an excellent site for playing back the generated xml files. +""" +# Textual Imports +from textual.widgets import (Collapsible, TabPane, Input, Select, + Checkbox, Button, Label, RichLog) +from textual.widget import Widget +from textual.containers import VerticalScroll, HorizontalGroup + +# Standard Imports +from typing import Iterator +import threading +from studio.model import Plugin + +# Attempt to import SCAMP. If it fails, we flag it so the UI can warn the user. +try: + import scamp + SCAMP_AVAILABLE = True +except ImportError: + SCAMP_AVAILABLE = False + + +# ==== Musical Scales & Progressions ==== +SCALES = { + "C Pentatonic Major (Bright)": [0, 2, 4, 7, 9], + "A Aeolian Minor (Melancholic)": [0, 2, 3, 5, 7, 8, 10], + "F Lydian (Dreamy)": [0, 2, 4, 6, 7, 9, 11], + "D Harmonic Minor (Classical)": [0, 2, 3, 5, 7, 8, 11], + "Whole Tone (Ethereal)": [0, 2, 4, 6, 8, 10] +} + + +def get_extended_scale(scale_intervals: list[int], octaves: int = 5, base_midi: int = 48) -> list[int]: + """Extends a set of scale intervals across multiple octaves.""" + pitches = [] + for oct in range(octaves): + for interval in scale_intervals: + pitches.append(base_midi + (oct * 12) + interval) + return pitches + + +class P(Plugin): + def on_initialized(self) -> None: + self.name = 'tunes' + + # Internal State + self._is_playing: bool = False + self._playback_thread: threading.Thread | None = None + self._stop_event: threading.Event = threading.Event() + + # Connect UI Signals + self.view.sig_button_pressed.connect(self._handle_button_press) + + def controls(self) -> Iterator[Widget]: + with Collapsible(title='Playback Controls', collapsed=False): + with HorizontalGroup(): + yield Button('▶ Play', id='btn-play-music', variant="success") + yield Button('■ Stop', id='btn-stop-music', variant="error") + + yield Label("\nTempo (BPM)") + self.bpm_input = Input(value='120', placeholder='e.g. 120') + yield self.bpm_input + + yield Label("Playback Timing") + self.timing_select = Select( + [("Sequential (Arpeggio)", "seq"), ("Simultaneous (Chord)", "chord")], + value="seq", allow_blank=False + ) + yield self.timing_select + + with Collapsible(title='Musical Mapping', collapsed=False): + yield Label("Base Instrument") + self.base_instrument_select = Select( + [ + ("Piano", "piano"), ("Marimba", "marimba"), + ("Vibraphone", "vibraphone"), ("Cello", "cello"), + ("Harp", "harp"), ("Glockenspiel", "glockenspiel"), + ("Standard Drumkit", "drumkit:standard"), + ("Electronic Drumkit", "drumkit:electronic"), + ("808 Drumkit", "drumkit:808"), + ("Jazz Drumkit", "drumkit:jazz"), + ("Orchestra Drumkit", "drumkit:orchestra"), + ("Woodblock", "woodblock") + ], + value="piano", allow_blank=False + ) + yield self.base_instrument_select + + yield Label("Instrument Map (Quanta: Instr)") + self.instrument_map_input = Input(value='', placeholder='e.g. A: drumkit:808, B: cello') + yield self.instrument_map_input + + yield Label("Pitch Mapping Mode") + self.mapping_mode = Select( + [ + ("Position + Quanta Combined", "combined"), + ("Quanta Value Only", "quanta"), + ("Spatial Position Only", "position") + ], + value="combined", allow_blank=False + ) + yield self.mapping_mode + + yield Label("Ignored Quanta (Comma Separated)") + self.ignored_input = Input(value='.,0,_', placeholder='e.g. .,0,_') + yield self.ignored_input + + with Collapsible(title='Scale & Tuning Config', collapsed=True): + yield Label("Scale / Progression") + self.scale_select = Select( + [(name, name) for name in SCALES.keys()] + [("Custom (Use Intervals Below)", "custom")], + value="C Pentatonic Major (Bright)", allow_blank=False + ) + yield self.scale_select + + yield Label("Custom Intervals (Comma Separated)") + self.custom_scale_input = Input(value='0, 2, 4, 7, 9', placeholder='e.g. 0, 2, 4, 7, 9') + yield self.custom_scale_input + + yield Label("Number of Octaves") + self.octaves_input = Input(value='5', placeholder='e.g. 5') + yield self.octaves_input + + yield Label("Base MIDI Note (C3 = 48)") + self.base_midi_input = Input(value='48', placeholder='e.g. 48') + yield self.base_midi_input + + with Collapsible(title='Transcription & Export', collapsed=False): + self.export_checkbox = Checkbox('Record Performance', value=False) + yield self.export_checkbox + + self.export_format = Select( + [("MusicXML (.xml)", "xml"), ("MIDI (.mid)", "mid")], + value="xml", allow_blank=False + ) + yield self.export_format + + def panel(self) -> TabPane | None: + self.log_view = RichLog(id="tunes-log-view", highlight=True, markup=True, wrap=True) + return TabPane( + self.name.title(), + VerticalScroll( + self.log_view + ) + ) + + def _handle_button_press(self, event: Button.Pressed) -> None: + if event.button.id == 'btn-play-music': + self._start_playback() + elif event.button.id == 'btn-stop-music': + self._stop_playback() + + def _start_playback(self) -> None: + if not SCAMP_AVAILABLE: + self.log_view.write("[bold red]SCAMP library not found![/bold red]") + self.log_view.write("Please run `pip install scamp` in your environment to use this plugin.") + return + + if self._is_playing: + self.view.notify("Music is already playing.", severity="warning") + return + + if not self.model.flow.events: + self.log_view.write("[bold red]No simulation events found. Run the engine first.[/bold red]") + return + + try: + float(self.bpm_input.value) + except ValueError: + self.view.notify("Invalid BPM value.", severity="error") + return + + self.log_view.clear() + self._is_playing = True + self._stop_event.clear() + + # Use native python threading to allow immediate interruption via threading.Event + self._playback_thread = threading.Thread(target=self._playback_loop, daemon=True) + self._playback_thread.start() + + def _stop_playback(self) -> None: + # Executes directly in UI thread. No self.cft() necessary. + if self._is_playing: + self.log_view.write("[bold yellow]Stopping playback gracefully...[/bold yellow]") + self._stop_event.set() + + def _wait_interruptable(self, session: 'scamp.Session', duration: float) -> bool: + """ + Blocks the SCAMP clock in small increments so that the stop event can interrupt + long musical rests or notes immediately without freezing the application thread. + Returns False if interrupted. + """ + waited = 0.0 + step = 0.1 + while waited < duration: + if self._stop_event.is_set(): + return False + wait_time = min(step, duration - waited) + session.wait(wait_time) + waited += wait_time + return True + + def _playback_loop(self) -> None: + """The main blocking thread loop that processes the cellular flow and plays audio.""" + session = None + try: + bpm = float(self.bpm_input.value) + base_instrument_name = self.base_instrument_select.value + scale_name = self.scale_select.value + playback_timing = self.timing_select.value + mapping_mode = self.mapping_mode.value + ignored_chars = [c.strip() for c in self.ignored_input.value.split(',')] + + # Parse Instrument Map + instr_map = {} + if self.instrument_map_input.value.strip(): + pairs = self.instrument_map_input.value.split(',') + for pair in pairs: + if ':' in pair: + q, instr = pair.split(':', 1) + instr_map[q.strip()] = instr.strip() + + # Setup SCAMP Session + session = scamp.Session(tempo=bpm) + + # Preload parts to avoid stutter during mid-playback instantiation + parts = {} + def get_part(name: str): + name_lower = name.lower() + is_drum = name_lower.startswith("drumkit") or name_lower in ("percussion", "drums") + + if is_drum: + # Determine General MIDI drumkit preset based on the tag + preset_num = 0 # Default to Standard Kit + if "electronic" in name_lower: + preset_num = 24 + elif "808" in name_lower: + preset_num = 25 + elif "jazz" in name_lower: + preset_num = 32 + elif "orchestra" in name_lower: + preset_num = 48 + + # Cache key needs to be unique per kit type + actual_name = f"Drums_{preset_num}" + if actual_name not in parts: + # Map to General MIDI Bank 128 (Percussion) with the specified Preset + parts[actual_name] = session.new_part("Drums", preset=(128, preset_num)) + return parts[actual_name] + else: + if name not in parts: + parts[name] = session.new_part(name) + return parts[name] + + get_part(base_instrument_name) + for instr_name in instr_map.values(): + get_part(instr_name) + + # Parse Tuning Parameters + try: + octaves = int(self.octaves_input.value) + except ValueError: + octaves = 5 + + try: + base_midi = int(self.base_midi_input.value) + except ValueError: + base_midi = 48 + + if scale_name == "custom": + try: + intervals = [int(x.strip()) for x in self.custom_scale_input.value.split(',')] + except ValueError: + intervals = SCALES["C Pentatonic Major (Bright)"] # fallback + else: + intervals = SCALES.get(scale_name, SCALES["C Pentatonic Major (Bright)"]) + + # Generate our spatial mapping scale + extended_scale = get_extended_scale(intervals, octaves=octaves, base_midi=base_midi) + + if self.export_checkbox.value: + session.start_transcribing() + self.cft(self.log_view.write, "[bold green]Transcription started...[/bold green]") + + self.cft(self.log_view.write, f"[bold green]Playing {scale_name}...[/bold green]") + + for event in self.model.flow.events: + if self._stop_event.is_set(): + break + + try: + # Extract the first space from the multi-way branches for deterministic music + space = next(event.spaces) + cells = list(space.get_all_cells()) + except StopIteration: + continue + + elements_to_play = [] + + # Map the cells to pitch scale degrees and instruments + for i, cell in enumerate(cells): + val = str(cell.quanta).strip() + if val and val not in ignored_chars: + + # Instrument Resolution + instr = instr_map.get(val, base_instrument_name) + is_drum = instr.lower().startswith("drumkit") or instr.lower() in ("percussion", "drums") + + # Pitch / Note Resolution + if mapping_mode == "position": + scale_index = i + elif mapping_mode == "quanta": + scale_index = sum(ord(c) for c in val) + else: + scale_index = sum(ord(c) for c in val) + i + + # Drums require specific MIDI notes (General MIDI Drums: 35-81). + # High melodic pitches (e.g., > 81) produce no sound on drum tracks! + if is_drum: + pitch = 35 + (scale_index % 47) + else: + pitch = extended_scale[scale_index % len(extended_scale)] + + elements_to_play.append((i, val, pitch, instr)) + + if not elements_to_play: + self.cft(self.log_view.write, f"Event {event.time}: [grey]Rest[/grey]") + if not self._wait_interruptable(session, 1.0): break + continue + + if playback_timing == "chord": + # Play all notes for this event simultaneously across multiple instruments + log_str = ", ".join(f"{v}->{p}({i_name})" for _, v, p, i_name in elements_to_play) + self.cft(self.log_view.write, f"Event {event.time}: [blue]Chord [{log_str}][/blue]") + + for _, _, pitch, instr in elements_to_play: + part = get_part(instr) + part.play_note(pitch, 0.6, 1.0, blocking=False) + + if not self._wait_interruptable(session, 1.0): break + else: + # Play notes sequentially (Arpeggio style). Each note gets 0.25 beats (16th note) + log_str = ", ".join(f"[{idx}]{v}->{p}({i_name})" for idx, v, p, i_name in elements_to_play) + self.cft(self.log_view.write, f"Event {event.time}: [blue]Seq [{log_str}][/blue]") + + note_dur = 0.25 + for idx, val, pitch, instr in elements_to_play: + if self._stop_event.is_set(): break + part = get_part(instr) + part.play_note(pitch, 0.6, note_dur, blocking=False) + if not self._wait_interruptable(session, note_dur): break + + # ==== Cleanup and Export Pipeline ==== + if self.export_checkbox.value and not self._stop_event.is_set(): + self.cft(self.log_view.write, "\n[bold cyan]Processing Performance Export...[/bold cyan]") + performance = session.stop_transcribing() + + export_fmt = self.export_format.value + export_path = self.model.project_path / f"cellular_score_{self.model.flow_path.stem}.{export_fmt}" + + try: + if export_fmt == 'xml': + # Valid SCAMP 0.8+ API sequence to export MusicXML + score = performance.to_score(title=f"Cellular Automata - {scale_name}") + score.export_music_xml(str(export_path)) + self.cft(self.log_view.write, f"[bold green]Exported MusicXML to: {export_path}[/bold green]") + elif export_fmt == 'mid': + performance.export_to_midi_file(str(export_path)) + self.cft(self.log_view.write, f"[bold green]Exported MIDI to: {export_path}[/bold green]") + + except Exception as e: + self.cft(self.log_view.write, f"[bold red]Failed to export: {str(e)}[/bold red]") + + except Exception as e: + self.cft(self.log_view.write, f"[bold red]Playback Error: {str(e)}[/bold red]") + + finally: + # Prevents sustained instruments (Cello, etc.) from ringing out indefinitely. + if session is not None: + try: + session.kill() + except Exception: + pass + + self.cft(self.log_view.write, "\n[bold yellow]Playback concluded.[/bold yellow]") + self._is_playing = False + + +plugin = P() diff --git a/src/studio/model.py b/src/studio/model.py index 1e3bbe1..6869418 100644 --- a/src/studio/model.py +++ b/src/studio/model.py @@ -1,16 +1,9 @@ -"""The model side of the MVC paradigm - -# TODO: -- Clean up the way the Model part of the MVC is designed. -- Connect it up to the View/Controller. -""" -from typing import Optional, Any +"""The model side of the MVC paradigm""" +from typing import Optional, Iterator, TYPE_CHECKING, cast, Callable from lang import FlowLangBase, FlowLang # in the implementation from abc import ABC, abstractmethod -from textual.widgets import TabPane, Collapsible -from textual.app import App as TextualApp -from core.signals import Signal -from copy import deepcopy +from textual.widgets import TabPane +from textual.widget import Widget # used for dynamic imports and path management from pathlib import Path @@ -19,25 +12,11 @@ import sys import importlib - -class Flow: - """ - Represents - """ - def __init__(self): - self.flow: FlowLangBase = FlowLang() - self.src: str = "" - - # metadata - self.name: str = "" - self.file_path: Path = Path() - self.is_dirty: bool = False - - def save_file(self): - pass - - def open_file(self): - pass +# used for type checking +if TYPE_CHECKING: + from studio.view import EditorScreen +else: + class EditorScreen(object): pass # must define due to reference in type casting class Model: @@ -45,22 +24,30 @@ class Model: The source of truth for the application state (a Singleton Pattern). Manages the current workspace and open file flows. """ - on_load: Signal = Signal() - on_save: Signal = Signal() - def __init__(self, name: str, project_path: Path) -> None: - # ======== Basic Project Config ======== + def __init__(self, name: str, project_path: Path, view: EditorScreen) -> None: + """Name and project path are passed to initiate the model. The textual app is simply passed as a reference so + that plugins maintain access to it.""" + # ======== Project Attributes ======== self.project_name: str = name # name the user has given the project self.project_path: Path = project_path + self.flow_path: Path | None = None + self._edit_hash: int = 0 # used to check if some text has already been saved... + self.flow: FlowLangBase = FlowLang() + + # ======== View Hook (for access through model/plugin) ======== + self.view: EditorScreen = view # ======== Plugins ======== self.plugins: list[Plugin] = [] # add builtin plugins - from studio.stdplgns import analysis, output, run - for module in (analysis, output, run): + from studio.stdplgns import run, explore, analysis + for module in (run, explore, analysis): for _, obj in inspect.getmembers(module): if isinstance(obj, Plugin): + obj._model = self + obj._view = self.view self.plugins.append(obj) # load all plugins for pp in (self.project_path / "plugins").glob("*.py"): @@ -73,76 +60,78 @@ def __init__(self, name: str, project_path: Path) -> None: # Look for instances of Plugin inside the module for _, obj in inspect.getmembers(module): if isinstance(obj, Plugin): + obj._model = self + obj._view = self.view self.plugins.append(obj) - # ======== Active Flows ======== - self.flows: list[Flow] = [] - self.active_flow: Optional[Flow] = None - - # add default flow - self.flows.append(_:=Flow()) - _.name = "Root" - self.active_flow = _ - - def get_flow_options(self) -> list[str]: - """ - Returns list of tuples formatted for a Textual Select widget. - """ - return [f.name for f in self.flows] - - def create_new_flow(self, name: str, branch_from_current: bool) -> bool: - """Create a new Flow object and return True if the flow was created, otherwise False.""" - if name in (f.name for f in self.flows): - return False - self.flows.append(_:=( - deepcopy(self.active_flow) if self.active_flow and branch_from_current else Flow() - )) - _.name = name - self.active_flow = _ - return True - - def delete_selected_flow(self) -> None: - new_flow_idx: int = self.flows.index(self.active_flow) - 1 - self.flows.remove(self.active_flow) - if len(self.flows) > 0: - self.active_flow = self.flows[new_flow_idx] - else: - self.active_flow = None - - def plugins_save_configs(self): - """Direct all plugins to save configuration of the plugin.""" + # ======== Initialize any children models (plugins) ======== for p in self.plugins: - p.save_configuration() + p.on_initialized() + + def write_file(self, text: str) -> bool: + """Writes to the file and returns True if the file was written to.""" + if self.flow_path and self._edit_hash != (eh:=hash(text)): + self.flow_path.write_text(text) + self._edit_hash = eh + return True + return False + + def read_file(self) -> str | None: + if self.flow_path: + self._edit_hash = hash(text:=self.flow_path.read_text()) + return text + return None + + def open_file(self, path: Path | None): + self.flow_path = path -# ================ Client Implemented ================ +# ================ Plugin Support ================ class Plugin(ABC): """ Any class that inherits from this, becomes a plugin and is expected to implement the methods below. Only one instance of this class is expected for each plugin PER APP. If session/flow-instance-specific behavior is desired, the session change signal must be watched and handled. + IMPORTANT NOTE: The view call self.panel() and then self.control() in that order. Thus, calls may need to be placed + strategically if self.panel() references something in self.controls(). + Required attributes: - name: str # the name of the plugin - - refreshable: bool # if the panel and controls should be called again due to a change in widget objects - (only used by the app screen to determine whether to rerender this plugin) + - model: Model # gives the plugin access to the model + - view: EditorScreen # gives the plugin access to the app """ - @abstractmethod - def __init__(self, model: Model, app: TextualApp) -> None: - pass + def __init__(self) -> None: + # Define the unset required attributes + self.name: str = cast(str, cast(object, None)) + self._model: Model = cast(Model, cast(object, None)) + self._view: EditorScreen = cast(EditorScreen, cast(object, None)) + + @property + def model(self) -> Model: + return self._model + + @property + def view(self) -> EditorScreen: + return self._view + + @property + def cft(self) -> Callable: + """Used to call a textual method/function from another thread (for thread-safety).""" + return self._view.app.call_from_thread @abstractmethod - def panel(self) -> TabPane | None: - """Returns the widget to be displayed in the panel for this plugin.""" - return None + def on_initialized(self) -> None: + """Called when the plugin is fully loaded by the model.""" + pass @abstractmethod - def controls(self) -> tuple[str, list[Collapsible]] | None: + def controls(self) -> Iterator[Widget]: """Returns the controls (in renderable format) for modifying this plugin's behavior.""" - return None + pass @abstractmethod - def save_configuration(self): - """Optional method to implement that is called by the editor when exiting.""" - pass + def panel(self) -> TabPane | None: + """Returns the widget to be displayed in the panel for this plugin.""" + return None diff --git a/src/studio/stdplgns/analysis.py b/src/studio/stdplgns/analysis.py index e69de29..011a596 100644 --- a/src/studio/stdplgns/analysis.py +++ b/src/studio/stdplgns/analysis.py @@ -0,0 +1,306 @@ +""" +This plugin provides access to various analysis features such as causal networks and overall evolution metrics. +""" +# Textual Imports +from textual.widgets import (Collapsible, TabPane, Input, Checkbox, SelectionList, + Button, Label, Sparkline, Select, DataTable) +from textual.widget import Widget +from textual.containers import VerticalScroll +from textual.widgets.selection_list import Selection + +# Standard Imports +from typing import Iterator +from studio.model import Plugin +from core.graph import EventCausalityGraph +from core.numlib import str_to_num, INF +from studio.config import USER_DATA_DIR_PATH +from pyvis.network import Network +import networkx as nx +from statistics import fmean +import math + + +class P(Plugin): + def on_initialized(self) -> None: + self.name = 'analysis' + + # tools + self._causal_graph: EventCausalityGraph | None = None + + # signals + self.view.sig_button_pressed.connect(self.handle_button_press) + + def controls(self) -> Iterator[Widget]: + with Collapsible(title='Causal Network', collapsed=False): + yield Button('Build Graph', id='build-graph', variant="primary") + self.causal_network_event_range = Input(':32') + self.causal_network_event_range.border_title = 'Event Range' + yield self.causal_network_event_range + self.collapse_edges = Checkbox('Collapse Edges') + yield self.collapse_edges + yield Label("\nPersistence") + self.export_format = Select( + [("Gephi", 0), ("GraphML", 1), ("Sparse6", 2), ("Graph6", 3), ("GML", 4), ("Adjacency List", 5), ("Multiline Adjacency", 6)], + allow_blank=False + ) + yield self.export_format + yield Button('Export as Format', id='export-graph') + + with Collapsible(title='VisJS Viewer', collapsed=False): + yield Button('View Graph', id='view-graph', variant="primary") + self.vis_html_path = Input(value="auto", placeholder="e.g., auto or /home/test.html") + self.vis_html_path.border_title = 'HTML File Path' + yield self.vis_html_path + self.vis_open_browser = Checkbox('Open Browser', value=True) + yield self.vis_open_browser + + yield Label("\nCanvas Settings") + self.vis_height = Input(value="800px", placeholder="e.g., 800px") + self.vis_height.border_title = 'Height' + yield self.vis_height + self.vis_width = Input(value="100%", placeholder="e.g., 100%") + self.vis_width.border_title = 'Width' + yield self.vis_width + self.vis_bgcolor = Input(value="#ffffff", placeholder="e.g., #ffffff") + self.vis_bgcolor.border_title = 'Background Color' + yield self.vis_bgcolor + self.vis_font_color = Input(value="", placeholder="e.g., #000000") + self.vis_font_color.border_title = 'Font Color' + yield self.vis_font_color + self.vis_heading = Input(value="") + self.vis_heading.border_title = 'Graph Heading' + yield self.vis_heading + self.vis_cdn = Select( + [("Remote", "remote"), ("Local", "local"), ("Inline", "in_line")], + value="remote", + prompt="CDN Resources", + allow_blank=False + ) + yield self.vis_cdn + + yield Label("\nOptions") + self.vis_toggles = SelectionList( + Selection("Directed Edges", "directed", True), + Selection("Neighborhood Highlight", "neighborhood", True), + Selection("Select Menu", "select_menu", False), + Selection("Filter Menu", "filter_menu", False), + Selection("Hierarchy Layout", "layout", False), + ) + yield self.vis_toggles + yield Label("\nUI Filters") + self.show_buttons = Checkbox('Show Buttons', value=False) + yield self.show_buttons + self.vis_buttons_filter = SelectionList( + Selection("All", "all", False), + Selection("Nodes", "nodes", True), + Selection("Edges", "edges", True), + Selection("Layout", "layout", False), + Selection("Interaction", "interaction", True), + Selection("Manipulation", "manipulation", False), + Selection("Physics", "physics", True), + Selection("Selection", "selection", False), + Selection("Renderer", "renderer", False) + ) + yield self.vis_buttons_filter + + with Collapsible(title='Causal Distribution'): + yield Label('Summery Function:') + self.summary_function = Select([('Min', 0), ('Max', 1), ('Mean', 2)], allow_blank=False) + yield self.summary_function + self.evolution_metrics_event_range = Input(':100') + self.evolution_metrics_event_range.border_title = 'Event Range' + yield self.evolution_metrics_event_range + yield Button('Calculate', id='calculate-metrics', variant="primary") + yield Label() + + def handle_button_press(self, e: Button.Pressed): + _id: str = e.button.id + if _id == 'build-graph': + self._update_causal_graph() + elif _id == 'export-graph': + self._export_as_format() + elif _id == 'view-graph': + self._open_vis_js_viewer() + elif _id == 'calculate-metrics': + self._update_evolution_metrics() + + def _open_vis_js_viewer(self) -> None: + if not self._causal_graph: + self.view.notify('No graph has been built yet!', severity='error') + return + + G: nx.MultiDiGraph = self._causal_graph + + # Get the list of selected internal string values + selected = self.vis_toggles.selected + + # Parse text inputs + f_color = self.vis_font_color.value.strip() + f_color_parsed = f_color if f_color else False + + # Layout requires True or None, so we map it from the selection list + layout_parsed = True if "layout" in selected else None + + # Instantiate the VisJS Network + net = Network( + height=self.vis_height.value.strip() or "800px", + width=self.vis_width.value.strip() or "100%", + directed="directed" in selected, + neighborhood_highlight="neighborhood" in selected, + select_menu="select_menu" in selected, + filter_menu="filter_menu" in selected, + bgcolor=self.vis_bgcolor.value.strip() or "#ffffff", + font_color=f_color_parsed, + layout=layout_parsed, + heading=self.vis_heading.value.strip(), + cdn_resources=self.vis_cdn.value + ) + + # Apply the show_buttons filters based on the selection list + if self.show_buttons.value: + filtered_buttons = self.vis_buttons_filter.selected + if "all" in filtered_buttons: + net.show_buttons(filter_=True) + else: + net.show_buttons(filter_=filtered_buttons) + + # Ingest and Render + net.from_nx(G) + try: + if self.vis_html_path.value == 'auto': + html_path = str(USER_DATA_DIR_PATH.joinpath('temp_vis_js.html')) + else: + html_path = self.vis_html_path.value + if not html_path.endswith('.html'): + html_path += '.html' + net.write_html(html_path, open_browser=self.vis_open_browser.value) + self.view.notify("VisJS graph launched in your browser!") + except Exception as e: + self.view.notify(f"Failed to open viewer: {str(e)}", severity="error") + + def _export_as_format(self): + if not self._causal_graph: + self.view.notify('No graph has been built yet!', severity='error') + return + + funcs = [nx.write_gexf, nx.write_graphml, nx.write_sparse6, nx.write_graph6, + nx.write_gml, nx.write_adjlist, nx.write_multiline_adjlist] + stems = ['.gexf', '.graphml', '.sparse6', '.graph6', '.gml', '.adjlist', '.multi_adjlist'] + stem: str = stems[self.export_format.value] + path: str = str( + self.model.project_path.joinpath( + self.model.flow_path.name + f'_at_{self.causal_network_event_range.value.replace(':', '_')}' + stem + ) + ) + try: + funcs[self.export_format.value](self._causal_graph, path) + self.view.notify(f'Successfully exported at "{path}"') + except Exception as e: + self.view.notify(f"Failed to export graph: {str(e)}", severity="error") + + def panel(self) -> TabPane | None: + # ==== network metrics ==== + self.causal_graph_metrics = DataTable(show_cursor=False, zebra_stripes=True) + self.causal_graph_metrics.add_columns(('Metric', 'metric'), ('Value', 'value')) + for i, m in enumerate(('Edge-Node Ratio', 'Network Density', + 'Longest DAG Path', 'Degree Assortativity Coefficient', + 'Flow Hierarchy')): + self.causal_graph_metrics.add_row(m, 'N/A', key=str(i)) + + # ==== Causal Distributions ==== + self.distance_distribution = Sparkline() + self.connected_abs_distribution = Sparkline() + self.connected_set_distribution = Sparkline() + return TabPane( + self.name.title(), + VerticalScroll( + Collapsible( + self.causal_graph_metrics, + title='Causal Network Metrics', collapsed=False + ), + Collapsible( + Label('[bold] Causal Distance [/bold]'), + self.distance_distribution, + Label('\n[bold] Connected Total [/bold]'), + self.connected_abs_distribution, + Label('\n[bold] Connected Unique [/bold]'), + self.connected_set_distribution, + title='Causal Distributions', collapsed=True + ), + ) + ) + + def _update_causal_graph(self): + # get the event range + rs: list[str] = self.causal_network_event_range.value.split(':') + if len(rs) == 2: rs.append('') # it must be 3 things + r = ( + int(rs[0]) if rs[0] else 0, + str_to_num(rs[1]) if rs[1] else INF, + abs(int(rs[2])) if rs[2] else 1 + ) + self._causal_graph = EventCausalityGraph().build(self.model.flow, r, self.collapse_edges.value) + self._update_causal_metrics_table(self._causal_graph) + + def _update_causal_metrics_table(self, g: EventCausalityGraph) -> None: + """Calculates causal graph metrics and updates the Textual DataTable.""" + if g.number_of_nodes() == 0: + for i in range(len(self.causal_graph_metrics.rows)): + self.causal_graph_metrics.update_cell(str(i), "value", "N/A") + return + try: + edge_node_ratio = g.number_of_edges() / g.number_of_nodes() + edge_node_ratio_str = f"{edge_node_ratio:.3f}" + except ZeroDivisionError: edge_node_ratio_str = "0.000" + density = nx.density(g) + density_str = f"{density:.5f}" + try: + depth = nx.dag_longest_path_length(g) + depth_str = str(depth) + except nx.NetworkXUnfeasible: depth_str = "Cycle Detected" + try: + assortativity = nx.degree_assortativity_coefficient(g) + if math.isnan(assortativity): + assort_str = "0.000" + else: + assort_str = f"{assortativity:.4f}" + except Exception: assort_str = "0.000" + try: + flow = nx.flow_hierarchy(g) + flow_str = f"{flow:.4f}" + except ZeroDivisionError: flow_str = "0.000" + + # update the cells + for i, s in enumerate((edge_node_ratio_str, density_str, depth_str, assort_str, flow_str)): + self.causal_graph_metrics.update_cell(str(i), "value", s) + + def _update_evolution_metrics(self): + # get the event range + rs: list[str] = self.evolution_metrics_event_range.value.split(':') + if len(rs) == 2: rs.append('') # it must be 3 things + a, b, c = ( + int(rs[0]) if rs[0] else 0, + str_to_num(rs[1]) if rs[1] else INF, + abs(int(rs[2])) if rs[2] else 1 + ) + + # get summary function + f = (min, max, fmean)[self.summary_function.value] + self.distance_distribution.summary_function = f + self.connected_abs_distribution.summary_function = f + self.connected_set_distribution.summary_function = f + + # calculate the data to show + causal_distance_data: list[int] = [] + connected_abs_distance_data: list[int] = [] + connected_set_distance_data: list[int] = [] + for event in self.model.flow.events[a:b+(1 if b > 0 else 0):c]: + causal_distance_data.append(event.causal_distance_to_creation) + connected_abs_distance_data.append(len(_:=tuple(event.causally_connected_events))) + connected_set_distance_data.append(len(set(_))) + self.distance_distribution.data = causal_distance_data + self.connected_abs_distribution.data = connected_abs_distance_data + self.connected_set_distribution.data = connected_set_distance_data + + +plugin = P() diff --git a/src/studio/stdplgns/explore.py b/src/studio/stdplgns/explore.py new file mode 100644 index 0000000..e798285 --- /dev/null +++ b/src/studio/stdplgns/explore.py @@ -0,0 +1,559 @@ +""" +This plugin provides an exploratory environment for the evolutions of cellular systems. +""" +# Textual Imports +from rich.text import Text +from textual.widgets import (Collapsible, TabPane, Input, Select, + Checkbox, Label, DataTable as _DataTable, SelectionList, ContentSwitcher) +from textual.widgets.data_table import CellKey +from textual.widget import Widget +from textual.coordinate import Coordinate +from textual.widgets.selection_list import Selection +from textual.containers import VerticalScroll, Horizontal, Vertical +from textual.events import MouseMove + +# Standard Imports +from typing import Iterator, Sequence, cast +from core.numlib import INF, str_to_num, is_infinity +from core.engine import Event as FlowEvent, SpaceState, Cell as FlowCell, DeltaCell, DeltaSpace, DeltaSpaces +from core.prettier import SpaceStateStringFormatter +from core.signals import Signal +from lang.implementation import BaseRule +from studio.model import Plugin, FlowLangBase + + +class DataTable(_DataTable): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.sig_mouse_over_inner_cell: Signal[Coordinate | None, int] = Signal() + self.enabled_sig_mouse_over_space_cell: bool = False + + def on_mouse_move(self, event: MouseMove) -> None: + if not self.enabled_sig_mouse_over_space_cell: + return + # Grab the row/col directly from the exact terminal cell the mouse is touching. + meta = event.style.meta # metadata holds the row and column info + # If the mouse is over the header or the empty space below the table, this metadata won't exist. + if not meta or "row" not in meta or "column" not in meta or meta['row'] == -1: + self.sig_mouse_over_inner_cell.emit(None, 0) + return + coord = Coordinate(meta["row"], meta["column"]) + start_x = 0 + PADDING = self.cell_padding + columns = list(self.columns.values()) + for i in range(coord.column): # Calculate start_x ONLY for columns BEFORE the hovered one + col = columns[i] + if col.auto_width: + base_width = max(col.width or 0, col.content_width or 0) + else: + base_width = col.width or col.content_width or 0 + # Add this column's total footprint (width + 1 left pad + 1 right pad) + start_x += base_width + 2 * PADDING + # Calculate the specific character offset inside the cell + virtual_x = event.x + self.scroll_offset.x + # Subtract start_x to zero out the column, then subtract 1 for THIS column's left padding + char_offset = (virtual_x - start_x) - PADDING + + # emit the signal + self.sig_mouse_over_inner_cell.emit(coord, char_offset) + + +class RulesetDashboard(VerticalScroll): + """A scoped container specifically for the ruleset layout.""" + + DEFAULT_CSS = """ + RulesetDashboard Horizontal { + height: auto; + width: 100%; + } + RulesetDashboard Vertical { + width: 1fr; + height: auto; + margin-right: 1; + } + """ + + +class P(Plugin): + def on_initialized(self) -> None: + self.name = 'explore' + + # tools + self.space_state_formatter: SpaceStateStringFormatter = SpaceStateStringFormatter() + self._cell_ids_to_highlight: frozenset[int] = frozenset() + + # attributes + self._render_range: tuple[int, int, int] = (-100, INF, 1) + self._space_columns_limit: int = 1 + self._hidden_space_columns: set[int] = set() + self._columns_control_bitmap: list[bool] = [True, False, False, False, False, False] + + # connect model signals + self.model.flow.on_evolved_n.connect(self.on_evolved) + self.model.flow.on_undone_n.connect(self.on_undo) + self.model.flow.on_clear.connect(self.on_clear) + self.model.flow.on_ruleset_set.connect( + lambda f: self.cft(self._rebuild_ruleset_table, f.ruleset.rules, self.ruleset_table) + ) + + # connect view signals + self.view.sig_input_submit.connect(self.handle_input_submit) + self.view.sig_selection_list_toggled.connect(self.handle_selection_toggle) + self.view.sig_checkbox_changed.connect(self.handle_checkbox_change) + + # temp trackers + self.__last_hover_coord_and_offset: tuple[Coordinate, int] = (Coordinate(0, 0), 0) + + def _column_control_bitmap_zero_out(self): + """Zeros out the column bitmap""" + self._columns_control_bitmap = [False for _ in range(len(self._columns_control_bitmap))] + + def controls(self) -> Iterator[Widget]: + self.render_range = Input(value='-100:', placeholder='e.g. -10: or 3:10', id='render-limit') + self.render_range.border_title = 'Render Range' + yield self.render_range + self.show_ruleset = Checkbox('Show Ruleset', value=True, id='show-ruleset') + yield self.show_ruleset + + with Collapsible(title='Column Controls', collapsed=False): + control_bits = self._columns_control_bitmap + self.column_controls = SelectionList( + Selection("Event Indices", 0, control_bits[0]), + Selection("Causal Distance", 1, control_bits[1]), + Selection("Causally Connected", 2, control_bits[2]), + Selection(" ├─ Unique", 3, control_bits[3]), + Selection(" ├─ Sorted", 4, control_bits[4]), + Selection(" ╰─ Counted", 5, control_bits[5]), + id='column-controls' + ) + self._rebuild_columns(rebuild_rows=False) # must be called here or sometime after to initiated columns. + yield self.column_controls + self.space_columns_limit = Input(str(self._space_columns_limit), type='integer', id='space-columns-limit') + self.space_columns_limit.border_title = 'Space Columns Limit' + yield self.space_columns_limit + self.hidden_space_columns = Input(placeholder='e.g. 5, 10:15, 20', id='hidden-space-columns') + self.hidden_space_columns.border_title = 'Hidden Space Columns' + yield self.hidden_space_columns + + with Collapsible(title='Cell Rendering', collapsed=False): + self.style_controls = SelectionList( + Selection("Cell Styling", 0, True), + Selection(" ╰─ On Symbol", 1, False), + Selection("Show Symbols", 2, True), + Selection("Cell Padding", 3, True), + Selection("Clear on Override", 4, False), + id='style-controls' + ) + yield self.style_controls + self.style_map = Input("auto", placeholder='e.g. auto or A: red', id='style-map') + self.style_map.border_title = 'Style Override' + yield self.style_map + self.symbol_map = Input("auto", placeholder='e.g. auto or A: Z', id='symbol-map') + self.symbol_map.border_title = 'Symbol Map' + yield self.symbol_map + + with Collapsible(title='Hover Explorer', collapsed=False): + self.hovered_info_label = Label() + self._reset_hovered_info_label() + yield self.hovered_info_label + self.hover_explorer = Checkbox('Hover Explorer', id='hover-explorer') + yield self.hover_explorer + self.hover_ruleset_enabled = Checkbox('Hovered Event Ruleset', disabled=True, id='hover-ruleset-enabled') + yield self.hover_ruleset_enabled + self.hover_style = Input("on black", placeholder='e.g. on red or bold blue', id='hover-style') + self.hover_style.border_title = 'Hover Style' + yield self.hover_style + + yield Label() + + def handle_input_submit(self, e: Input.Submitted): + _id: str = e.input.id + if _id == 'render-limit': + try: + rs: list[str] = e.value.strip().split(':') + if len(rs) == 2: rs.append('') # it must always be 3 things + self._render_range = ( + int(rs[0]) if rs[0] else 0, + str_to_num(rs[1]) if rs[1] else INF, + abs(int(rs[2])) if rs[2] else 1 + ) + self._rebuild_rows() + except: + self.view.notify('Invalid render range.', severity='warning') + e.input.value = '{0}:{1}:{2}'.format(*self._render_range) + + elif _id == 'space-columns-limit': + try: + v: int = int(e.value.strip()) + if not 0 <= v <= 1000: + raise ValueError + self._space_columns_limit = int(e.value) + self._rebuild_columns() + except: + self.view.notify('Invalid column space limit. Value must be between 0 and 1000.', severity='warning') + e.input.value = str(self._space_columns_limit) + + elif _id == 'hidden-space-columns': + try: + self._hidden_space_columns.clear() + values = e.value.replace(' ', '').split(',') + if values[0] != '': + for r in values: + if ':' in r: + a, b = r.split(':'); a, b = abs(int(a)), abs(int(b)) + if a > 1000 or b > 1000: raise ValueError + for i in range(a, b + 1): + self._hidden_space_columns.add(i) + else: + a = abs(int(r)) + if a > 1000: raise ValueError + self._hidden_space_columns.add(a) + self._rebuild_columns() + except: + self.view.notify('Invalid hidden ranges. Values/Ranges must be between 0 and 1000.', severity='warning') + + elif _id in ('style-map', 'symbol-map'): + self._handle_styling_update() + + elif _id == 'hover-style': + self.space_state_formatter.cell_id_style = e.value.strip() + + def handle_selection_toggle(self, e: SelectionList.SelectionToggled): + _id: str = e.selection_list.id + if _id == 'column-controls': + self._column_control_bitmap_zero_out() + for i in e.selection_list.selected: + self._columns_control_bitmap[i] = True + self._rebuild_columns() + if _id == 'style-controls': + self._handle_styling_update() + + def handle_checkbox_change(self, e: Checkbox.Changed): + _id: str = e.checkbox.id + if _id == 'hover-explorer': + self.data_table.enabled_sig_mouse_over_space_cell = e.value + self.hover_ruleset_enabled.disabled = not e.value + if not e.value: + self.hover_ruleset_enabled.value = self.hovered_ruleset_container.display = False + self._reset_hovered_info_label() + self._cell_ids_to_highlight = frozenset() + if _id == 'hover-ruleset-enabled' and self.hover_explorer.value: + self.hovered_ruleset_container.display = e.value + if not e.value: + self.hovered_ruleset_table.clear() + if _id == 'show-ruleset': + self.ruleset_container.display = e.value + + def panel(self) -> TabPane | None: + # Ruleset Table + self.ruleset_table = _DataTable(id='ruleset-table', show_cursor=False) + self.ruleset_table.add_columns('Selector', 'Target', 'Type', 'Group') + self.ruleset_container = Vertical( + Label('[bold] Active Ruleset Table [/bold]'), + self.ruleset_table + ) + + # Ruleset Table + self.hovered_ruleset_table = _DataTable(id='hovered-ruleset-table', show_cursor=False) + self.hovered_ruleset_table.add_columns('Selector', 'Target', 'Type', 'Group') + self.hovered_ruleset_container = Vertical( + Label('[bold] Hovered Event Ruleset Table [/bold]'), + self.hovered_ruleset_table, + ) + self.hovered_ruleset_container.display = False + + # Evolution Table + self.data_table = DataTable(id='data-table') + self.data_table.sig_mouse_over_inner_cell.connect(self._handle_mouse_over_data_table) + return TabPane( + self.name.title(), + RulesetDashboard( + Horizontal( + self.ruleset_container, + self.hovered_ruleset_container + ), + Label('[bold] Evolution Table [/bold]'), + self.data_table + ) + ) + + def _handle_styling_update(self): + control_bitmap: list[bool] = [False, False, False, False, False] + for i in self.style_controls.selected: control_bitmap[i] = True + + try: + style_map: dict[str, str] = { + k.strip(): v.strip() for k, v in (p.split(':') for p in self.style_map.value.split(',')) + } if self.style_map.value != 'auto' else None + + symbol_map: dict[str, str] = { + k.strip(): v.strip() for k, v in (p.split(':') for p in self.symbol_map.value.split(',')) + } if self.symbol_map.value != 'auto' else None + except: + self.view.notify('Invalid style map.', severity='error') + return + + # noinspection PyTypeChecker + self.space_state_formatter.config( + *control_bitmap[:4], + style_map, + control_bitmap[4], + symbol_map + ) + + # noinspection PyTypeChecker + self._rebuild_ruleset_table(self.model.flow.ruleset.rules, self.ruleset_table) + self._rebuild_rows() + + def _reset_hovered_info_label(self): + self.hovered_info_label.content = """[bold]Event Info[/bold] +• ---- +• ---- +• ---- +• ---- +• ---- +• ---- +• ---- + +[bold]Cell Info[/bold] +• ---- +• ---- +• ---- +• ---- +""" + + def _handle_mouse_over_data_table(self, coord: Coordinate | None, offset: int) -> None: + def reset_highlighted(): + self._reset_hovered_info_label() + if self._cell_ids_to_highlight: + self._cell_ids_to_highlight = frozenset() + self._rebuild_rows() + if (not coord + or offset == -1 + or self.data_table.coordinate_to_cell_key(coord).column_key in ('event', 'distance', 'connected')): + reset_highlighted() + return + + # calculate offset of the cell index (because of different padding/rendering options) + cell_content: str = str(self.data_table.get_cell_at(coord)) + if cell_content.startswith(' '): # if padding of " " around symbols is being used. + if cell_content.startswith(' '): # if not rendering symbols but blocks of " " + offset = offset // 2 # one extra symbol needs to be removed + else: + offset = offset // 3 # two extra symbols need to be removed + + # if the last cell was the same + if self.__last_hover_coord_and_offset == (coord, offset): + return + self.__last_hover_coord_and_offset = (coord, offset) + + cell_key: CellKey = self.data_table.coordinate_to_cell_key(coord) + row_idx: int = int(cell_key.row_key.value) + column_idx: int = int(cell_key.column_key.value) + + # grab all relevant information about the selected space + flow: FlowLangBase = self.model.flow + event: FlowEvent = flow.events[row_idx] + spaces: tuple[tuple[DeltaSpaces, DeltaSpace, SpaceState], ...] = tuple(event.spaces_with_metadata) + space_state: SpaceState = spaces[column_idx][2] + + # update the rows + if offset >= len(space_state): + reset_highlighted() + return + flow_cell: FlowCell = space_state.get_all_cells()[offset] + self._cell_ids_to_highlight = frozenset((id(flow_cell),)) + self._rebuild_rows() + + # update the hover info labels + try: + affected_cells: DeltaCell = tuple(event.affected_cells)[column_idx] + created_cells: int = len(affected_cells.new_cells) + destroyed_cells: int = len(affected_cells.destroyed_cells) + except IndexError: + created_cells: None = None + destroyed_cells: None = None + try: # in separate try block because of the destroyed_at maybe not existing + cell_destroyed_at: int = flow_cell.destroyed_at[column_idx] + lifespan: int = cell_destroyed_at - flow_cell.created_at + except IndexError: + cell_destroyed_at: None = None + lifespan: None = None + connected_events = tuple(event.causally_connected_events) + self.hovered_info_label.content = f"""[bold]Event #{event.time} | Space #{column_idx}[/bold] +• Branched Spaces: {len(spaces) - 1} +• Space Size: {len(space_state)} +• Causal Distance: {event.causal_distance_to_creation} +• Connected Total: {len(connected_events)} +• Connected Unique: {len(set(connected_events))} +• Created Cells: {created_cells} +• Destroyed Cells: {destroyed_cells} + +[bold]Cell #{offset}[/bold] +• Quanta: {flow_cell.quanta} +• Created at: {flow_cell.created_at} +• Destroyed at: {cell_destroyed_at} +• Lifespan: {lifespan} +""" + + # place the rule (and its chain if there is one) in the hovered ruleset table. + if self.hover_ruleset_enabled.value: + applied_rule: BaseRule = spaces[column_idx][0].rule + if applied_rule: + self._rebuild_ruleset_table(applied_rule.chain, self.hovered_ruleset_table, hide_disabled=True, remember_old_row_length=True) + + def _rebuild_ruleset_table(self, rules: Sequence[BaseRule], + table: DataTable, + hide_disabled: bool = False, + remember_old_row_length: bool = False) -> None: + # we have the f parameter because this is called by a Flow signal. + def print_rule(rule: BaseRule) -> tuple[Text, Text]: + selectors: list[Text] = [] + for s in rule.selector: + sv = s.selector + if isinstance(sv, str | bytes): + selectors.append(self.space_state_formatter.convert_pure_str(sv)) + else: + selectors.append(Text(str(sv))) + targets: list[Text] = [] + for t in rule.target: + tv = t.target + if isinstance(tv, int): + targets.append(Text(str(tv))) + else: # when `tv` is Sequence[Cell] + targets.append(self.space_state_formatter.convert_pure_str(''.join(str(c) for c in tv))) + return (Text(', ').join(selectors) if len(selectors) > 1 else selectors[0], + Text(', ').join(targets) if len(targets) > 1 else targets[0]) + + old_rows: int = table.row_count + table.clear() + rule: BaseRule + for i, rule in enumerate(rules): + if hide_disabled and rule.disabled: + continue + table.add_row( + *print_rule(rule), rule.__class__.__name__, rule.group, + label=str(i) + ) + old_rows -= 1 + if remember_old_row_length and old_rows: + for _ in range(old_rows): table.add_row() + + def on_evolved(self, f: FlowLangBase, steps: int) -> None: + cft = self.cft + dt = self.data_table + if not dt.row_count: + steps += 1 # to include the first space state + flush_mode: bool = self._render_range[0] < 0 and is_infinity(self._render_range[1]) + render_limit: int = abs(self._render_range[0]) # only used when flush mode is true + if flush_mode and steps >= render_limit: + cft(self._rebuild_rows) + else: + short_circuit: bool = False # just a little optimization for large loops + for event in f.events[-steps:]: + if short_circuit or flush_mode and dt.row_count >= render_limit: + short_circuit = True + cft(lambda: dt.remove_row(dt.coordinate_to_cell_key(Coordinate(0, 0)).row_key)) + cft(self._add_row, event) + cft(self._refresh_column_widths) + if not flush_mode: + dt.scroll_end(animate=False) + + def on_undo(self, f: FlowLangBase, steps: int) -> None: + # NOTE: this function is not very optimized for updates, but premature optimization is the root of all evil. + cft = self.cft + dt = self.data_table + old_rows_count = len(f.events) + steps + for i in range(steps): + try: cft(dt.remove_row, str(old_rows_count - i - 1)) + except: pass + if self._render_range[0] < 0 and is_infinity(self._render_range[1]): # if flushing + cft(self._rebuild_rows) + cft(self._refresh_column_widths) + + def on_clear(self): + try: + self.cft(self.data_table.clear) + except RuntimeError: # this function may or may not be called within the main thread. + self.data_table.clear() + + def _add_row(self, event: FlowEvent): + columns = [] + + # Process the info columns + control_bitmap = self._columns_control_bitmap + if control_bitmap[2]: + if control_bitmap[3]: # if we are collapsing it + connected = set(event.causally_connected_events) + else: + connected = tuple(event.causally_connected_events) + if control_bitmap[5]: # if we are counting the causally connect (to display that metric instead) + connected = len(connected) + elif control_bitmap[4]: + connected = sorted(connected) + else: + connected = None # just to satisfy the IDE wanting a name in the space + for data, show in zip((event.time, + event.causal_distance_to_creation, + connected), + control_bitmap): + if show: columns.append(data) + + # Process the space columns + spaces: Iterator[SpaceState] = event.spaces + formatter: SpaceStateStringFormatter = self.space_state_formatter + cells_to_highlight: frozenset[int] = self._cell_ids_to_highlight + hidden: set[int] = self._hidden_space_columns + for i in range(self._space_columns_limit): + try: + space = spaces.__next__() # we must always increment next (even though it may be hidden, that is what makes the check work) + if i in hidden: + continue + columns.append(formatter(space, cells_to_highlight)) + except StopIteration: + break + # Add everything as a row + self.data_table.add_row( + *columns, + key=str(event.time) + ) + + def _rebuild_rows(self) -> None: + a, b, c = self._render_range + dt = self.data_table + old_x, old_y = dt.scroll_x, dt.scroll_y + dt.clear() + for event in self.model.flow.events[a:b + (1 if b > 0 else 0):c]: + self._add_row(event) + self._refresh_column_widths() + dt.scroll_to(x=old_x, y=old_y, animate=False) + + def _rebuild_columns(self, rebuild_rows: bool = True) -> None: + dt = self.data_table + old_x, old_y = dt.scroll_x, dt.scroll_y + dt.clear(columns=True) + if self._columns_control_bitmap[0]: + dt.add_column('Event', key='event') + if self._columns_control_bitmap[1]: + dt.add_column('Distance', key='distance') + if self._columns_control_bitmap[2]: + dt.add_column('Connected', key='connected') + hidden: set[int] = self._hidden_space_columns + for i in range(self._space_columns_limit): + if i in hidden: + continue + dt.add_column(_:=str(i), key=_) + if rebuild_rows: + self._rebuild_rows() + dt.scroll_to(x=old_x, y=old_y, animate=False) + + def _refresh_column_widths(self) -> None: + """Update the column widths as Textual does not currently do that for us when removing rows.""" + dt = self.data_table + if 0 <= (rc:=(dt.row_count - 1)): + # noinspection PyProtectedMember + dt._update_column_widths( + {dt.coordinate_to_cell_key(Coordinate(rc, i)) for i in range(len(dt.columns))} + ) + +plugin = P() diff --git a/src/studio/stdplgns/output.py b/src/studio/stdplgns/output.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/studio/stdplgns/run.py b/src/studio/stdplgns/run.py index e69de29..9df05e8 100644 --- a/src/studio/stdplgns/run.py +++ b/src/studio/stdplgns/run.py @@ -0,0 +1,207 @@ +""" +This plugin provides basic running/undoing features, hot-reload, and several other utilities for interactive with flows. +""" +# Textual Imports +from textual.widgets import Collapsible, TabPane, Input, Checkbox, Button, ProgressBar, Label, RichLog +from textual.widget import Widget +from textual.containers import ScrollableContainer, Horizontal +from textual.timer import Timer + +# Standard Imports +from typing import Iterator +import time +import psutil +import os +import sys +from rich.traceback import Traceback as RichTraceback +from textual.worker import Worker + +from studio.model import Plugin, FlowLangBase + + +class P(Plugin): + def on_initialized(self) -> None: + self.name = 'run' + + # Connect buttons to our execution logic + self.view.sig_button_pressed.connect( + self.handle_btn_press + ) + self.view.sig_checkbox_changed.connect( + self.handle_checkbox_change + ) + + # Connect flow signals to update progress bar + FlowLangBase.on_evolved_step.connect(self._handle_progress_updates) + FlowLangBase.on_undone_step.connect(self._handle_progress_updates) + + # Attributes + self._process = psutil.Process(os.getpid()) + self._prev_flowlang_src: str = '' # for diff checking + self._hot_after_n_changes: int = 0 # for fast reference + self._running_thread: Worker | None = None # for checking and managing the current thread + + def controls(self) -> Iterator[Widget]: + # NOTE: there aren't many settings for the run tab due to most controls being available through the DSL. + self.undo_steps = Input(type='integer', value='1') + self.undo_steps.border_title = 'Undo Button Steps' + yield self.undo_steps + with Collapsible(title='Hot Reload', collapsed=False): + self.hot_mode = Checkbox('Enable hot reload mode', id='hot-reload') + yield self.hot_mode + self.hot_after_n_changes = Input(type='integer', value='1', id='hot-after-change') + self.hot_after_n_changes.border_title = 'After n changes' + yield self.hot_after_n_changes + with Collapsible(title='Program Log', collapsed=False): + self.mem_profile = Checkbox('Show memory profile') + yield self.mem_profile + self.show_traceback = Checkbox('Show tracebacks') + yield self.show_traceback + + yield Label() + + self.hot_reload_timer: Timer = self.view.set_interval( + 1, self._handle_hot_reload, + pause=True # start paused. + ) + + def panel(self) -> TabPane | None: + # Progress Bar Widget + self.progress_bar = ProgressBar(total=100, show_eta=True, id="run-progress-bar") + self.progress_container = Collapsible( + self.progress_bar, + title="Execution Progress", + collapsed=False + ) + + # Standard Output Widget + self.log_view = RichLog(id="run-log-view", highlight=True, markup=True, wrap=True) + self.log_container = Collapsible( + self.log_view, + Button('Clear Log', id="clear-log"), + Label(), + title="Program Log", collapsed=False + ) + + return TabPane( + self.name.title(), + ScrollableContainer( + self.progress_container, + self.log_container + ) + ) + + def handle_btn_press(self, e: Button.Pressed): + btn: str = e.button.id + if btn == 'btn-run': + self.execute_run() + elif btn == 'btn-undo': + self.execute_undo() + elif btn == 'btn-clear': + self.model.flow.clear_evolution() + elif btn == 'clear-log': + self.log_view.clear() + self.log_view.write(f"[bold green] --- Log Cleared --- [/bold green]") + + def handle_checkbox_change(self, e: Checkbox.Changed): + btn: str = e.checkbox.id + if btn == 'hot-reload': + self.hot_after_n_changes.disabled = e.checkbox.value + if e.checkbox.value: + self._hot_after_n_changes = int(self.hot_after_n_changes.value) + self.hot_reload_timer.resume() + else: + self.hot_reload_timer.pause() + + def _flow_src_diff_check(self) -> int: + a: str = self.view.code_editor_text_area.text + b: str = self._prev_flowlang_src + return sum(x != y for x, y in zip(a, b)) + abs(len(a) - len(b)) + + def _handle_hot_reload(self) -> None: + # import time + # self.log_view.write(time.time()) # for debugging timer + if self._flow_src_diff_check() >= self._hot_after_n_changes: # only hot-reload after n changes to src + self._prev_flowlang_src = self.view.code_editor_text_area.text + self.execute_run() + + def _handle_progress_updates(self, f: FlowLangBase) -> None: + self.cft( # we must call from the main thread to be thread-safe according to docs + self.progress_bar.update, + progress=f.n_step_progress * 100 + ) + # import time # to test slowdowns + # time.sleep(0.5) + + def _execute(self) -> None: + # use self.cft to be thread-safe on textual side (according to docs on Workers) + if self.mem_profile.value: + mem_start = self._process.memory_info().rss / 1024 / 1024 + start_time = time.perf_counter() + + # execute the FlowLang + try: + self.model.flow.interpret(self.view.code_editor_text_area.text) + except Exception as e: + # Handle the exception + if self.show_traceback.value: + self.cft( + self.log_view.write, + RichTraceback.from_exception(*sys.exc_info(), word_wrap=True) + ) + else: + self.cft( + self.log_view.write, + f"[bold red]Execution Error:[/bold red] {str(e)}" + ) + + # show profiler info + if self.mem_profile.value: + mem_end = self._process.memory_info().rss / 1024 / 1024 + # noinspection PyUnboundLocalVariable + elapsed_time = time.perf_counter() - start_time + # noinspection PyUnboundLocalVariable + mem_diff = mem_end - mem_start + self.cft( + self.log_view.write, + f"[bold]Time Spent:[/bold] {elapsed_time:.4f} seconds\n" + f"[bold]Memory Change:[/bold] {mem_diff:+.2f} MB\n" + f"[bold]Total Studio Memory:[/bold] {mem_end:.2f} MB\n" + ) + + def execute_run(self) -> None: + """Handles the flow execution and updates the UI components.""" + flow_path = self.model.flow_path + if not flow_path: + self.log_view.write("[bold red]Studio Error:[/bold red] No flow selected to run.") + return + if self._running_thread and self._running_thread.is_running: # do not run while thread is active + self.log_view.write("[bold red]Studio Error:[/bold red] A flow thread is currently running.") + return + self._running_thread = self.view.run_worker( + self._execute, + thread=True + ) + self.log_view.write(f'[bold green]Run {flow_path.name}...[/bold green]') + # TODO: maybe more info will be logged at some point (if deemed useful) + + def execute_undo(self) -> None: + """Handles the flow undo and updates the UI components.""" + flow_path = self.model.flow_path + if not flow_path: + self.log_view.write("[bold red]Studio Error:[/bold red] No flow selected to undo.") + return + if self._running_thread and self._running_thread.is_running: # do not run while thread is active + self.log_view.write("[bold red]Studio Error:[/bold red] A flow thread is currently running.") + return + try: + steps: int = int(self.undo_steps.value) + self._running_thread = self.view.run_worker( + lambda: self.model.flow.undo(steps), + thread=True + ) + self.log_view.write(f'[bold green]Undo last {steps} steps...[/bold green]') + except: + self.log_view.write("[bold red]Studio Error:[/bold red] Could not execute undo command.") + +plugin = P() diff --git a/src/studio/styles.tcss b/src/studio/styles.tcss index 654d251..6c3741c 100644 --- a/src/studio/styles.tcss +++ b/src/studio/styles.tcss @@ -49,20 +49,6 @@ ModalDialog { height: auto; } -/* Inputs and Border Titles */ -ModalDialog Input { - width: 100%; - margin: 1 0; /* Vertical margin gives space for the border title */ - background: #252526; - border: round #444444; - color: #e0e0e0; -} - -ModalDialog Input:focus { - border: round #007acc; - background-tint: #252526; -} - /* Notes (Static text blocks) */ .modal-note { width: 100%; @@ -162,7 +148,6 @@ EditorScreen { #workspace { width: 2fr; height: 100%; - border-right: solid #2b2b2b; layout: vertical; } @@ -171,8 +156,7 @@ EditorScreen { padding: 1 2; } -/*The flow selector dropdown*/ -#select-flow { +#select-flow { /*The flow selector dropdown*/ width: 10; } @@ -190,6 +174,7 @@ EditorScreen { width: 1fr; height: 100%; background: #181818; + border-left: solid #2b2b2b; min-width: 30; } @@ -198,6 +183,31 @@ EditorScreen { COMPONENT STYLES ========================================= */ +Input { + width: 1fr; + background: rgba(0, 0, 0, 0); + border: round #7a7a7a; + color: #e0e0e0; +} + +Input:focus { + border: round #007acc; + background-tint: #252526; +} + +RichLog { + overflow: auto; +} + +Collapsible { + padding-right: 1; + padding-bottom: 1; +} + +/* ========================================= + COMPONENT CLASSES + ========================================= */ + .full-width { width: 1fr; } diff --git a/src/studio/view.py b/src/studio/view.py index 4e9439d..f40d00a 100644 --- a/src/studio/view.py +++ b/src/studio/view.py @@ -1,24 +1,30 @@ """View/Controller side of the MVC paradigm +LongTermTODO: +- Make each flow session have its own text editor. +- Add edit buttons create, rename, and delete files. + Policies: - For software design reasons, it is best to make the user-flow from welcome screen to editor irreversible for the current process so that we don't have to introspectively modify state if the user selects a different project. -This decision was made due to the ease of plugin implementation and project saving. +This decision was made due to the ease of plugin implementation and project saving. At some point, however, we may +prefer to have a checkbox called exit to project manager when ctrl+q is pressed. """ from pathlib import Path from typing import cast, Iterable from textual.app import App, ComposeResult -from textual.containers import Container, Center, Horizontal, Vertical +from textual.containers import Container, Center, Horizontal, Vertical, ScrollableContainer from textual.screen import Screen, ModalScreen from textual.widgets import ( - DirectoryTree as _DT, TextArea, Button, Label, - Select, TabbedContent, OptionList, Input, + DirectoryTree as _DirectoryTree, TextArea as _TextArea, Button, Label, + Select, TabbedContent, OptionList, Input, SelectionList, Footer, ContentSwitcher, Static, Checkbox ) from textual.widgets.option_list import Option, DuplicateID as DuplicateIDError from textual import on from studio import config from studio import model +from core.signals import Signal # we don't use Textual builtin signal system due to limitation with widget mounting being required first. LOGO: str = r"""______ _ ______ _ _____ _ _ _ @@ -36,15 +42,76 @@ def __init__(self): self.styles.width = '1fr' # make it take up as much space as possible -class DirectoryTree(_DT): +class DirectoryTree(_DirectoryTree): def filter_paths(self, paths: Iterable[Path]) -> Iterable[Path]: for path in paths: - if str(path).endswith("__") or str(path).startswith("__"): + if str(path.stem) == 'plugins' or str(path).endswith("__") or str(path).startswith("__"): continue if path.is_dir() or any(map(path.match, config.SUPPORTED_FILE_TYPES)): yield path +class TextArea(_TextArea): + def load_text(self, text: str | None) -> None: + """Overrides the load_text method so that placeholders work properly...""" + # NOTE: edit history is cleared + if text is None: + super().load_text("") + self.disabled = True + self.placeholder = "// Select a .flow file to begin..." + return + if self.disabled: self.disabled = False + super().load_text(text) + self.placeholder = f"// This file is empty!\n// Start typing to edit this file..." + + +# import re +# class CustomHighlightTextArea(TextArea): +# # 1. Define your custom highlighting rules as (regex_pattern, highlight_name) +# # The 'highlight_name' should match standard names used in Textual themes +# # (e.g., "keyword", "string", "comment", "number", "variable", "function"). +# HIGHLIGHT_RULES = [ +# (r"\b(def|class|return|if|else|elif|import|from)\b", "keyword"), +# (r'".*?"|\'.*?\'', "string"), +# (r"#.*", "comment"), +# (r"\b\d+\b", "number"), +# (r"\b[A-Z][a-zA-Z0-9_]*\b", "type"), # Class names +# ] +# +# def on_mount(self) -> None: +# # Pre-compile the regexes for performance when the widget mounts +# self._compiled_rules = [ +# (re.compile(pattern), name) +# for pattern, name in self.HIGHLIGHT_RULES +# ] +# +# def _build_highlight_map(self) -> None: +# """Override to apply custom regex-based highlights instead of tree-sitter.""" +# +# # 1. Clear the existing line cache and highlight map +# self._line_cache.clear() +# self._highlights.clear() +# +# # 2. Iterate over every line in the document +# for line_index in range(self.document.line_count): +# line_text = self.document[line_index] +# +# # 3. Apply each regex rule to the current line +# for regex, highlight_name in self._compiled_rules: +# for match in regex.finditer(line_text): +# # 4. CRITICAL: Convert Python's character indices to byte offsets. +# # Textual's `_render_line` expects byte offsets because that is +# # what Tree-sitter natively outputs. +# start_char, end_char = match.span() +# start_byte = len(line_text[:start_char].encode("utf-8")) +# end_byte = len(line_text[:end_char].encode("utf-8")) +# +# # 5. Append the highlight tuple for this line +# self._highlights[line_index].append( +# (start_byte, end_byte, highlight_name) +# ) + + class ModalDialog(ModalScreen[dict]): """ A flexible modal with border-titled inputs, notes, and dynamic buttons. @@ -155,6 +222,7 @@ def handle_modal_result(result: dict): self.notify('Please enter a valid path to a directory.', severity='error') return try: + # noinspection PyUnresolvedReferences self.query_one("#recents-list").add_option(Option(f'{name} [grey]({path})[/grey]', name)) config.RecentProjects.add(name, path) self.notify(f"Loaded project at: {path}") @@ -190,11 +258,12 @@ def handle_modal_result(result: dict): def btn_open_project(self): _: OptionList = cast(OptionList, self.query_one("#recents-list")) if i:=_.highlighted_option: - self.app.MODEL = model.Model( # create an instance of model side of the MVC design - i.id, # we use id as the field to store the name of the project - config.RecentProjects.get_path(i.id) + self.dismiss( + { + "project_name": i.id, + "project_path": config.RecentProjects.get_path(i.id) + } ) - self.app.push_screen("editor") else: self.notify('Please select a project to open!', severity='warning') @@ -224,24 +293,6 @@ class EditorScreen(Screen): ("ctrl+shift+f1", "toggle_max", "Toggle Max"), ] - def __refresh_flow_selector__(self) -> None: - """Call to refresh the flow selector""" - # load flows - m: model.Model = self.app.MODEL - ol: Select = self.query_one("#select-flow") - ol.set_options([(f.name, i) for i, f in enumerate(m.flows)]) - try: - # noinspection PyProtectedMember - ol._init_selected_option(m.flows.index(m.active_flow)) # select the first option - except: pass - - def __refresh_plugin_components__(self) -> None: - pass - - def on_mount(self) -> None: - self.__refresh_flow_selector__() - self.__refresh_plugin_components__() - def compose(self) -> ComposeResult: # --- LEFT COLUMN: Project Files --- with Vertical(id="project-directory"): @@ -253,43 +304,79 @@ def compose(self) -> ComposeResult: with Vertical(id="workspace"): # Top Toolbar with Horizontal(id='workspace-toolbar'): - yield Label("▼ ", classes='gray') - yield Select((), id="select-flow", compact=True) - yield Button('+', id="btn-add-flow", compact=True, classes='increment-btn green') - yield Button('-', id="btn-sub-flow", compact=True, classes='increment-btn red') + self.open_file_label = Label("No Open File", classes='gray') + yield self.open_file_label yield Spacer() - # yield Label("No Open File", classes='gray') - # yield Spacer() yield Button("Run", id="btn-run", classes="action-btn green", compact=True) - yield Label("|", classes="separator") - yield Button("Debug", id="btn-debug", classes="action-btn orange", compact=True) - yield Label("|", classes="separator") - yield Button("Clear", id="btn-clear", classes="action-btn red", compact=True) + yield Label("| ", classes="gray") + yield Button("Undo", id="btn-undo", classes="action-btn orange", compact=True) + yield Label("| ", classes="gray") + yield Button("clear", id="btn-clear", classes="action-btn red", compact=True) # Code Editor - yield (_:=TextArea.code_editor( - text="// Select a .flow file to begin...", + self.code_editor_text_area: TextArea = TextArea.code_editor( + text="", id="code-editor", disabled=True - )) + ) + yield self.code_editor_text_area # _.register_language() # Plugin Panel with TabbedContent(id="plugin-panel"): # loop through the plugin TabPanes and yield them here - pass + for plugin in self.app.MODEL.plugins: + if _:=plugin.panel(): + yield _ # --- RIGHT COLUMN: Plugin Control Menu --- with Vertical(id="plugin-controls"): - yield Label("⭘ Run Settings", classes="pane-header", id="plugin-controls-header") + yield Label("", classes="pane-header", id="plugin-controls-header") with ContentSwitcher(id="sidebar-switcher"): # loop through the collapsable's that the plugin provides, and place in Vertical containers. - pass + for i, plugin in enumerate(self.app.MODEL.plugins): + with ScrollableContainer(id=f'tab-{i+1}'): + for c in plugin.controls(): + yield c # --- Footer --- yield Footer() - # --- ACTION HANDLERS --- + # ==== Panel and Controls ==== + def on_tabbed_content_tab_activated(self, event: TabbedContent.TabActivated): + """Dynamically switches the Right Sidebar content AND Title.""" + container: ContentSwitcher = self.query_one('#sidebar-switcher') + container.current = event.pane.id + # noinspection PyProtectedMember + self.query_one('#plugin-controls-header').content = f"⭘ {event.pane._title}" + + # ==== File Manager ==== + def on_directory_tree_file_selected(self, event: DirectoryTree.FileSelected): + m: model.Model = self.app.MODEL + if not event.path.exists(): + self.notify("That file no longer exists!", severity="error") + self.query_one(DirectoryTree).reload() + return + self.action_save_file() + m.open_file(event.path) + self.code_editor_text_area.text = m.read_file() + self.open_file_label.content = event.path.name + + @on(Button.Pressed, '#btn_refresh_project_dir') + def btn_refresh_project_dir(self): + self.query_one(DirectoryTree).reload() + self.notify(f"Refreshed Project Directory...") + + # ==== Action Handlers ==== + def action_run(self): + """Action to press the run button upon this action...""" + self.query_one('#btn-run').press() + + def action_save_file(self): + m: model.Model = self.app.MODEL + if m.write_file(self.code_editor_text_area.text): + self.notify(f"Saved the \"{m.flow_path.name}\" file.") + def action_toggle_left_sidebar(self): sidebar = self.query_one("#project-directory") sidebar.display = not sidebar.display @@ -314,117 +401,74 @@ def action_toggle_max(self): else: self.maximize(self.focused) - # ==== Top Bar ==== - @on(Button.Pressed, '#btn-add-flow') - def btn_add_flow(self): - def handle_modal_result(result: dict) -> None: - if result["pressed_button"] == "Cancel": - return - if not result["input"]["flow_name"]: - self.notify("A new flow must be given a name!", severity="error") - return - if not self.app.MODEL.create_new_flow(result["input"]["flow_name"], result["checkbox"]["branch_checkbox"]): - self.notify(f"A flow with name \"{result["input"]["flow_name"]}\" already exists.", severity="error") - return - self.__refresh_flow_selector__() - self.notify(f"Created the \"{self.app.MODEL.active_flow.name}\" flow session...") - - # Push the screen with the configuration and callback - self.app.push_screen( - ModalDialog( - title="Flow Creation Tool", - fields=[ - { - "type": "note", - "text": "Create a new flow session and optionally branch it from the current flow session." - }, - { - "type": "input", - "prompt": "New Flow Name", - "placeholder": "e.g. Flow1", - "id": "flow_name" - }, - { - "type": "checkbox", - "label": "Branch from current flow session", - "id": "branch_checkbox", - } - ], - buttons=["Create", "Cancel"] - ), - callback=handle_modal_result - ) - - @on(Button.Pressed, '#btn-sub-flow') - def btn_remove_flow(self): - if not self.app.MODEL.active_flow: - self.notify("Empty flow sessions!", severity="error") - return - def handle_modal_result(result: dict): - if result.get("pressed_button") == "Yes": - self.notify(f"Deleted the \"{self.app.MODEL.active_flow.name}\" flow session...") - self.app.MODEL.delete_selected_flow() - self.__refresh_flow_selector__() - self.app.push_screen( - ModalDialog( - title="Delete Flow?", - fields=[ - { - "type": "note", - "text": "Please confirm flow deletion..." - } - ], - buttons=["Yes", "No", "Cancel"] - ), - callback=handle_modal_result - ) + # ==== Initial Setup and Signal Connections ==== + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) - @on(Select.Changed, '#select-flow') - def select_flow(self, event: Select.Changed): - if isinstance(event.value, int): - self.app.MODEL.active_flow = self.app.MODEL.flows[event.value] + # ==== Signals ==== + self.sig_button_pressed: Signal[Button.Pressed] = Signal() + self.sig_checkbox_changed: Signal[Checkbox.Changed] = Signal() + self.sig_input_submit: Signal[Input.Changed] = Signal() + self.sig_selection_list_toggled: Signal[SelectionList.SelectionToggled] = Signal() + self.sig_select_changed: Signal[Select.Changed] = Signal() + self.sig_save_config_directive: Signal = Signal() - @on(Button.Pressed, "#btn-clear") - def btn_clear(self): - pass + @on(Button.Pressed) + def _emit_button_signals(self, event: Button.Pressed) -> None: + """Handle emitting the button pressed signal""" + self.sig_button_pressed.emit(event) - @on(Button.Pressed, "#btn-debug") - def btn_debug(self): - pass + @on(Checkbox.Changed) + def _emit_checkbox_signals(self, event: Checkbox.Changed) -> None: + """Handle emitting the checkbox changed signal""" + self.sig_checkbox_changed.emit(event) - @on(Button.Pressed, "#btn-run") - def action_run(self): - pass + @on(Input.Submitted) + def _emit_input_submit_signals(self, event: Input.Changed) -> None: + """Handle emitting the input changed signal""" + self.sig_input_submit.emit(event) - # ==== Panel and Controls ==== - def on_tabbed_content_tab_activated(self, event: TabbedContent.TabActivated): - """Dynamically switches the Right Sidebar content AND Title.""" - pass + @on(SelectionList.SelectionToggled) + def _emit_selection_list_toggled(self, event: SelectionList.SelectionToggled) -> None: + """Handle emitting the selection list toggled signal""" + self.sig_selection_list_toggled.emit(event) - # ==== File Manager ==== - @on(Button.Pressed, '#btn_refresh_project_dir') - def btn_refresh_project_dir(self): - dir_tree: DirectoryTree = self.query_one("#project-dir-tree") - dir_tree.reload() - self.notify(f"Refreshed Project Directory...") + @on(Select.Changed) + def _emit_select_changed(self, event: Select.Changed) -> None: + """Handle emitting the select changed signal""" + self.sig_select_changed.emit(event) class Main(App): CSS_PATH = "styles.tcss" + @property + def editor_screen(self) -> EditorScreen: + return cast(EditorScreen, self.get_screen('editor')) + def on_mount(self): # create the screens and push the welcome page self.install_screen(WelcomeScreen(), name="welcome") self.install_screen(EditorScreen(), name="editor") - self.push_screen("welcome") + def on_project_opened(result: dict): + self.MODEL = model.Model( + result["project_name"], + result["project_path"], + self.editor_screen + ) + self.push_screen("editor") + self.push_screen("welcome", callback=on_project_opened) def action_quit(self): if not hasattr(self, "MODEL"): self.exit() def handle_modal_result(result: dict): if result["pressed_button"] == "Yes": + # noinspection PyUnresolvedReferences + self.screen.action_save_file() if result["checkbox"]["save_config"]["value"]: - self.app.MODEL.plugins_save_configs() + # noinspection PyTypeChecker + self.editor_screen.sig_save_config_directive.emit() self.exit() # Push the screen with the configuration and callback self.app.push_screen( @@ -448,6 +492,5 @@ def handle_modal_result(result: dict): if __name__ == "__main__": - import textual.events app = Main() app.run() diff --git a/tests/example-studio-project/ca.flow b/tests/example-studio-project/ca.flow new file mode 100644 index 0000000..b81b0ab --- /dev/null +++ b/tests/example-studio-project/ca.flow @@ -0,0 +1,10 @@ +// Rule 30 +@init("A" * 10 + "B" + 10 * "A"); +@import(ca.fp); + +// define the rules +@decode(wns, AB, 30); + +// Run n times +// @clear(); +@evolve(10); diff --git a/tests/example-studio-project/plugins/example.py b/tests/example-studio-project/plugins/example.py new file mode 100644 index 0000000..86a3c3d --- /dev/null +++ b/tests/example-studio-project/plugins/example.py @@ -0,0 +1,21 @@ +# Textual Imports +from textual.widgets import TabPane, Label, Collapsible +from textual.widget import Widget + +# Standard Imports +from typing import Iterator +from studio.model import Plugin + + +class P(Plugin): + def on_initialized(self) -> None: + self.name = 'Example' + + def controls(self) -> Iterator[Widget]: + return iter([Collapsible(Label("Settings go here!"), Label(" ... "), title='Example Setting')]) + + def panel(self) -> TabPane | None: + return TabPane(self.name.title(), Label(' This is the panel that would be developed in this plugin.')) + + +plugin = P() diff --git a/tests/example-studio-project/sss.flow b/tests/example-studio-project/sss.flow new file mode 100644 index 0000000..a6649e3 --- /dev/null +++ b/tests/example-studio-project/sss.flow @@ -0,0 +1,10 @@ +// set the initial state +@init("AB"); + +// define the sequential rules +ABA -> AAB; +A -> ABA; + +// Evolve n times +//@clear(); +@evolve(10); diff --git a/src/implementations/game_of_life.py b/tests/game_of_life.py similarity index 100% rename from src/implementations/game_of_life.py rename to tests/game_of_life.py diff --git a/tests/lang/rule_30.flow b/tests/lang/rule_30.flow index 5dec60f..0521bda 100644 --- a/tests/lang/rule_30.flow +++ b/tests/lang/rule_30.flow @@ -1,5 +1,5 @@ // Rule 30 -@init(AAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAA); +@import(ca.fp) // define the rules @compress(0); diff --git a/tests/lang/test.py b/tests/lang/test.py index 2cdf043..843c0d9 100644 --- a/tests/lang/test.py +++ b/tests/lang/test.py @@ -29,7 +29,7 @@ def get_mem(): // A -> ACB; """ flow = FlowLang(code) - time = timeit.timeit(lambda: flow.evolve_n(18), number=1) + time = timeit.timeit(lambda: flow.evolve(18), number=1) mem_end = get_mem() print(f"Total Memory of evolution: {mem_end - mem_start:.2f} MB") diff --git a/tests/test0.py b/tests/test0.py index e69de29..d61f2bf 100644 --- a/tests/test0.py +++ b/tests/test0.py @@ -0,0 +1,3 @@ +a = {1, 4, 3} +b = sorted(a) +print(b)