In [1]:
#| default_exp helpers

In [2]:
#| export
from __future__ import annotations


# Helpers

> ...

# Prologue

In [3]:
#| export
import functools
import os
import sys
import time
from binascii import hexlify
from datetime import datetime
from functools import cache
from functools import partial
from math import floor
from pathlib import Path
from types import ModuleType
from typing import Any
from typing import Callable
from typing import DefaultDict
from typing import Mapping
from typing import overload
from typing import Sequence

import fastcore.all as FC
import IPython.display
import nbdev.config
from fastcore.xml import to_xml
from fasthtml.core import FastHTML
from fasthtml.core import FT
from fasthtml.xtend import Style
from IPython.display import DisplayHandle
from olio.common import bundle_path
from olio.common import Config
from olio.common import is_listy
from olio.common import shorten
from olio.common import update_


In [4]:
#| export
from fasthtml.components import Span, Summary, Ul, Details, Li

In [5]:
import dataclasses
import importlib
import inspect
import json
import operator
from functools import reduce
from inspect import Parameter

from fastcore.foundation import L
from fastcore.test import *
from fasthtml.components import show
from IPython.display import HTML
from olio.basic import _get_globals
from olio.common import empty
from olio.common import setup_console
from olio.test import test_raises
from fastcore.xml import to_xml


----

In [6]:
console, cprint = setup_console(140)

----

# Helpers

In [7]:
#| export

emptyd, emptyl, emptyt = {}, [], ()
_n = '\n'

# Configuration

## DEBUG

In [8]:
#| export

def DEBUG(iftrue:Any=True, iffalse:Any=False, k='DEBUG_BRIDGET'):
    "Returns `iftrue` if debug environment variable is set, otherwise `iffalse`"
    return iftrue if os.environ.get(k, '').lower() in ('true', '1', 'yes', 'y') else iffalse

## BridgeCfg


In [9]:
#| export

class BundleCfg(Config):
    out_dir: list[Path] = [nbdev.config.get_config().lib_path/'js']  # directories to search for js modules
    rewrite_imports: bool = True  # rewrite imports to use dynamic import
    import_name: str = 'brdimport'  # name of the dynamic import function

class BridgeCfg(Config):
    """
    Core Bridget behavior settings.
    
    if `True`:
    - `auto_show`: FastHTML objects display as HTML instead of markdown.
    - `auto_mount`: components with routes are automatically mounted.
    - `auto_id`: display elements get auto-generated IDs.
    - `bootstrap`: load bridget.js on import.
    - `current_did`: the ID of the current display cell.
    """
    auto_show: bool = False
    auto_mount: bool = False
    auto_id: bool = False
    bundle_cfg: BundleCfg = BundleCfg()
    bootstrap: bool = os.environ.get('BRIDGET_BOOTSTRAP', '').lower() in ('true', '1', 'on', 'yes', 'y')
    current_did: str|None = None

    def for_module(self, module: str|ModuleType, dir='js') -> BridgeCfg:
        "Set up BridgeCfg for a specific module by adding its js directory to bundle search paths"
        if (p := bundle_path(module).resolve())/dir not in self.bundle_cfg.out_dir: self.bundle_cfg.out_dir.insert(0, p/dir)
        return self

bridge_cfg = BridgeCfg()

In [10]:
cprint(bridge_cfg.for_module(__name__))

# Command Execution

## run_command

In [11]:
#| export

async def arun_command(command: str, cwd: Path|None=None, **kwargs):
    "Async version of run_command using anyio"
    import anyio
    import subprocess
    try:
        process = await anyio.run_process(
            command,
            cwd=cwd or Path().absolute().parent,
            **kwargs
        )
        return process.stdout.decode('utf-8'), process.stderr.decode('utf-8')
    except subprocess.CalledProcessError as e:
        return e.stdout.decode('utf-8'), e.stderr.decode('utf-8')

def run_command(command: str, cwd: Path|None=None, **kwargs):
    "Execute shell command synchronously, returns (stdout, stderr)"
    import subprocess
    result = subprocess.run(
        command,
        shell=True,
        capture_output=True,
        text=True,
        cwd=cwd or Path().absolute().parent,
        check=True,
        **kwargs
    )
    if result.returncode != 0:
        raise RuntimeError(result.stderr)
    return result.stdout, result.stderr

In [12]:
a,_ = await arun_command('ls')
test_eq('bridget' in a, True)
_,b = await arun_command('node notfound')
test_eq('Error: Cannot find module' in b, True)

with test_raises(FileNotFoundError):
    await arun_command('ls', cwd=Path('/not/found'))

# Time Formatting

In [13]:
#| export

def ts(): return f"{datetime.now():%H:%M:%S.%f}"[:-3]

In [14]:
ts()

'11:23:22.138'

In [15]:
#| export

def ms2str(ts) -> str:
    "format timestamp as in milliseconds to readable time hours:minutes:seconds:milliseconds"
    return datetime.fromtimestamp(ts).time().isoformat().rstrip('0')

In [16]:
ts = datetime.now().timestamp()
ts, ms2str(ts)

(1757323402.147262, '11:23:22.147262')

# Id generation

## Kounter
> Counter of keys

In [17]:
#| export

class Kounter:
    "Counter that tracks occurrences of keys and returns incremented count"
    def __init__(self): self.d = DefaultDict(int)
    def __call__(self, k): d = self.d; d[k] += 1; return self.d[k]

kounter = Kounter()

In [18]:
cntr = Kounter()
cntr('a')
cntr('b')
cntr('a')
cntr('a')
cntr('b')
cntr('b')
cntr('b')
test_eq(cntr.d, {'a': 3, 'b': 4})
test_eq(cntr('int'), 1)

## id_gen
> Generate unique session IDs


In [19]:
import random
import re
from pathlib import Path

lines = Path("static/wordlist.txt").read_text().splitlines()
words = [line.strip() for line in lines if line.isalpha()]

In [20]:
def modify_word(word):
    # Randomly capitalize the first or second letter
    if len(word) > 1:
        idx_to_capitalize = random.choice([0, 1])
        word = word[:idx_to_capitalize] + word[idx_to_capitalize].upper() + word[idx_to_capitalize + 1:]
    else:
        word = word.upper()  # If single letter, capitalize it
    
    # Randomly add a number (0–99) at the start or end
    if random.choice([True, False]):
        number = random.randint(0, 99)
        # if random.choice([True, False]):
        #     word = f"{number}{word}"  # Number at the start
        # else:
        word = f"{word}{number}"  # Number at the end
    
    return word

def generate_readable_id(num_words=3):
    words_part = [modify_word(random.choice(words)) for _ in range(num_words)]
    id_candidate = '-'.join(words_part)

    # Ensure it's a valid CSS identifier
    if not re.match(r"^[a-zA-Z_][\w\-]*$", id_candidate):  # Add '_' if invalid
        id_candidate = f"_{id_candidate}"
    
    return f"{id_candidate}-{random.randint(0, 9999)}"

In [21]:
generate_readable_id(), generate_readable_id()

('cOmmonly-Solves39-oPen24-855', 'bEaring-fLashes-pOssibly-6611')

In [22]:
#| export

SESSION_TS = str(floor(time.time()))

def simple_id():
    "Generate simple hex ID using random bytes"
    return 'b'+hexlify(os.urandom(16), '-', 4).decode('ascii')

def id_gen():
    "Create ID generator function that produces unique session-based IDs"
    kntr = Kounter()
    def _(o=None): 
        if o is None: return simple_id()
        # return f"{type(o).__name__}_{hash(o) if isinstance(o, Hashable) else kntr(type(o).__name__)}"
        if isinstance(o, str): return f"{o}_{kntr(o)}-{SESSION_TS}"
        return f"{type(o).__name__}_{kntr(type(o).__name__)}-{SESSION_TS}"
    return _

In [23]:
simple_id()

'b1a229e86-ce5f0bc4-d8997daa-8ff815a4'

The `id_gen` function creates a function that takes any object and generates an unique Id valid during the current session. Useful for creating unique element IDs in dynamic HTML content.


In [24]:
new_id = id_gen()
new_id(), new_id()

('b9039bae4-e764e417-eb5cc88d-6aa963c7',
 'bc46b383e-da955e86-30bbde2f-e40f2fbc')

In [25]:
int_id = id_gen()
int_id(7), int_id(7), int_id(888)

('int_1-1757323402', 'int_2-1757323402', 'int_3-1757323402')

In [26]:
int_id = id_gen()
int_id('asdf'), int_id('asdf'), int_id('asdf')

('asdf_1-1757323402', 'asdf_2-1757323402', 'asdf_3-1757323402')

In [27]:
obj_id = id_gen()
o1, o2 = object(), object()
print(obj_id(o1), obj_id(o2))

dict_id = id_gen()
print(dict_id(d1 := {'a': 1}), dict_id(d2 := {'b': 7}))

pth_id = id_gen()
print(pth_id(Path('.')), pth_id(Path()), pth_id(Path('./bin')))

object_1-1757323402 object_2-1757323402
dict_1-1757323402 dict_2-1757323402
PosixPath_1-1757323402 PosixPath_2-1757323402 PosixPath_3-1757323402


# Class Patching Utilities

## patch_cached

In [28]:
#| export

def patch_cached(cls, f, name:str|None=None):
    "Add cached method to class using functools.cache"
    name = name or (f if not isinstance(f, partial) else f.func).__name__ 
    setattr(cls, name, cache(f))

In [29]:
# type: ignore

from collections import defaultdict

class Test:
    def __init__(self): self.d = defaultdict(list)

def a(self, n:int=1):
    self.d['a'].append(n)
    return n+1


patch_cached(Test, a)

t1 = Test()
test_eq(t1.a(), 2)
test_eq(t1.a(), t1.a())
test_eq(t1.d['a'], [1])
test_eq(t1.a(3), 4)
test_eq(t1.a(3), t1.a(3))
test_eq(t1.d['a'], [1, 3])

t2 = Test()
test_eq(t2.a(), 2)
test_eq(t2.a(), t2.a())
test_eq(t2.d['a'], [1])
test_eq(t2.a(7), 8)
test_eq(t2.a(7), t2.a(7))
test_eq(t2.d['a'], [1, 7])

## patch_cached_property

In [30]:
#| export

def patch_cached_property(cls, f, name:str|None=None):
    "`cached_property` with `partial` support"
    is_partial, prop = isinstance(f, partial), functools.cached_property(f)
    if is_partial: prop.__doc__ = f.func.__doc__
    prop.attrname = name or (f if not is_partial else f.func).__name__ 
    setattr(cls, prop.attrname, prop)

In [31]:
# type: ignore

def a(self): 
    "a docs"
    self.d['a'].append('a'); return 2
def _b(self, n): 
    "b docs"
    self.d['b'].append(n); return n+n
class Test: 
    def __init__(self): self.d = defaultdict(list)

patch_cached_property(Test, a)
patch_cached_property(Test, lambda self: _b(self, 2), 'b2')
patch_cached_property(Test, partial(_b, n=3), 'b3')
patch_cached_property(Test, partial(lambda self: _b(self, 4)), 'b4')

t1 = Test()
test_eq(t1.a, 2)
test_eq(t1.a, t1.a)
test_eq(t1.d['a'], ['a'])
test_eq(t1.b2, 4)
test_eq(t1.b2, t1.b2)
test_eq(t1.d['b'], [2])
test_eq(t1.b3, 6)
test_eq(t1.b3, t1.b3)
test_eq(t1.d['b'], [2, 3])
test_eq(t1.b4, 8)
test_eq(t1.b4, t1.b4)
test_eq(t1.d['b'], [2, 3, 4])

## cached_property

In [32]:
#| export

class cached_property(functools.cached_property):
    "Enhanced cached_property that preserves function attributes"
    def __init__(self, func):
        super().__init__(func)
        for o in functools.WRAPPER_ASSIGNMENTS: setattr(self, o, getattr(func, o))
    # def __set_name__(self, owner, name):
    #     super().__set_name__(owner, name)
    #     if self.attrname is None:
    #         self.__qualname__ = f"{owner.__name__}.{name}"

# bridge_metadata

In [33]:
#| export

def bridge_metadata(metadata:dict|None=None, **kwargs):
    "Add or update 'bridge' key in metadata dict with kwargs"
    if not metadata: metadata = {'bridge': {**kwargs}}
    elif not 'bridge' in metadata: metadata['bridge'] = {**kwargs}
    else: metadata['bridge'].update(**kwargs)
    return metadata

def skip(metadata:dict|None=None, **kwargs):
    "Convenience function to add skip=True to bridge metadata"
    return bridge_metadata(metadata, skip=True, **kwargs)

In [34]:
test_eq(bridge_metadata(), {'bridge': {}})
test_eq(bridge_metadata(skip=True), {'bridge': {'skip': True}})
test_eq(bridge_metadata({'autoshow': True}, skip=True), {'autoshow': True, 'bridge': {'skip': True}})
test_eq(skip(), {'bridge': {'skip': True}})
test_eq(skip({'bridge': {'auto_show': True}}), {'bridge': {'skip': True, 'auto_show': True}})
test_eq(skip(auto_show=True), {'bridge': {'skip': True, 'auto_show': True}})

## Function Composition

## compose_first
> like [fastcore.compose](https://fastcore.fast.ai/basics.html#compose), but args are passed only to first function

In [35]:
#| export

def compose_first(*funcs:Callable, order:Callable | None = None) -> Callable:
    """Create a function that composes all functions in `funcs`, passing remaining `*args` and 
    `**kwargs` to first function only. `order`: key function to sort funcs before composing"""
    funcs = FC.listify(funcs)  # type: ignore
    if len(funcs)==0: return FC.noop
    if len(funcs)==1: return funcs[0]
    if order is not None: funcs = FC.sorted_ex(funcs, key=order)  # type: ignore
    def _inner(x, *args, **kwargs):
        x = funcs[0](x, *args, **kwargs)  # type: ignore
        for f in funcs[1:]: x = f(x)  # type: ignore
        return x
    return _inner

In [36]:
def add_one(x): return x + 1
def multiply_two(x): return x * 2
def add_ten(x): return x + 10

single = compose_first(add_one)
test_eq(single(5), 6)

test_eq(single, add_one)
empty = compose_first()
test_eq(empty, FC.noop)

composed = compose_first(add_one, multiply_two, add_ten)
test_eq(composed(5), 22)  # ((5+1) * 2) + 10 = 22

In [37]:
def tag_a(x): return f"a({x})"
def tag_b(x): return f"b({x})"
def tag_c(x): return f"c({x})"

natural = compose_first(tag_c, tag_a, tag_b)
test_eq(natural("x"), "b(a(c(x)))")

ordered = compose_first(tag_c, tag_a, tag_b, order=lambda f: f.__name__)
test_eq(ordered("x"), "c(b(a(x)))")  # Functions sorted: tag_a, tag_b, tag_c

# FastHTML Utilities

## nb_app
> Basic naked FastHTML app

In [38]:
#| export

@FC.delegates(FastHTML)  # type: ignore
def nb_app(**kwargs):
    from starlette.middleware.cors import CORSMiddleware
    kwargs.update(default_hdrs=False, sess_cls=None)
    app = FastHTML(**kwargs)
    app.user_middleware = list(filter(lambda x: x.cls is not CORSMiddleware, app.user_middleware))
    return app


In [39]:
app = nb_app()
test_eq(app.user_middleware, [])

## CLog

In [40]:
#| export

def CLog(*o):
    return f"<script>console.log({','.join(map(repr, o))})</script>"

In [41]:
display(HTML(CLog('aaaa', 'bbbb')))

# Display Utilities

## displaydh
> Covenience function and definitions just to avoid the stupid wiggly reds.

In [42]:
#| export

@FC.delegates(display, keep=True)  # type: ignore
def displaydh(*objs, **kwargs) -> DisplayHandle:
    return display(*objs, **update_(kwargs, display_id=True))  # type: ignore
displaydh.__doc__ = IPython.display.display.__doc__  # type: ignore

In [43]:
#| exporti

@overload
def display(
    *objs, include=None, exclude=None, metadata=None, transient=None,
    display_id:bool|str=True,
    raw=False, clear=False, **kwargs) -> DisplayHandle: ...
@overload
def display(
    *objs, include=None, exclude=None, metadata=None, transient=None,
    display_id=None,
    raw=False, clear=False, **kwargs) -> None: ...
def display(
    *objs, include=None, exclude=None, metadata=None, transient=None,
    display_id=None,
    raw=False, clear=False, **kwargs) -> DisplayHandle | None: ...

display = IPython.display.display  # type: ignore

In [44]:
dh = display('a', display_id=True)
# dh

'a'

In [45]:
dh = display('b', display_id="1234567890")
dh

'b'

<DisplayHandle display_id=1234567890>

In [46]:
dh = display('c')
dh

'c'

## DetailsJSON

In [47]:
%%HTML
<style>
    details ul { list-style-type:none; list-style-position: outside; padding-inline-start: 22px; margin: 0px; }
</style>
<details open>
<summary>Apollo astronauts</summary>
<ul>
  <li><span>1</span>: Neil Armstrong</li>
  <li><span>2</span>: Alan Bean</li>
  <li><details>
<summary>Apollo 11</summary>
<ul>
  <li><span>1</span>: Neil Armstrong</li>
  <li><span>2</span>: Alan Bean</li>
  <li><div><span>3</span>: Buzz Aldrin</div></li>
  <li><span>4</span>: Edgar Mitchell</li>
  <li><span>5</span>: Alan Shepard</li>
</ul></li>
  <li><span>4</span>: Edgar Mitchell</li>
  <li><span>5</span>: Alan Shepard</li>
</ul>

</details>


In [48]:
#| export

def Val(v): 
    "Render value with appropriate CSS class based on type"
    c = (
        'null' if v is None else 
        'true' if v is True else 
        'false' if v is False else 
        'string' if isinstance(v, str) else 
        'number' if isinstance(v, (int, float)) else 
        '')
    return Span(shorten(v, 'r', 140) if v is not None else 'None', cls=f"v {c}")
def NameVal(k, v):
    "Render key-value pair with name and value styling"
    return Span(Span(k, cls='n'), ': ', Val(v))

class DetailsJSON(dict):
    "Interactive collapsible JSON viewer with HTML details/summary structure"
    def __init__(self, *args, summary:str='', open:bool=True, openall:bool=False, skip:Sequence[str]=(), **kwargs):
        super().__init__(*args, **kwargs)
        self.summary, self.open, self.openall, self.skip = str(summary), open, openall, skip
    def __ft__(self, d:Mapping|None=None, summary:str|None=None, lvl:int=0, open:bool=False):
        if d is None: d = self; summary = self.summary or 'summary'; open=self.open
        open = self.openall or open
        return (
            Style(self._css_) if lvl == 0 else (), 
            Details(open=open)(
                Summary(summary, _n),
                Ul()(*(
                    Li(NameVal(k, v)) if k in self.skip else
                    self.__ft__(v, k, lvl+1) if isinstance(v, Mapping) else
                    self.__ft__(dict(list(zip(range(len(v)), v))), k, lvl+1) if is_listy(v) else
                    Li(NameVal(k, v)) 
                    for k,v in d.items()))))
    # _css_ = 'details ul { list-style-type:none; list-style-position: outside; padding-inline-start: 22px; margin: 0; } '
    _css_ = (
        'details ul { list-style-type:none; list-style-position: outside; padding-inline-start: 22px; margin: 0; } '
        '''details .string { color: #24837b; } details .string::before { content: "'"; } details .string::after { content: "'"; } '''
        'details .number { color: #ad8301; } '
        'details .true { color: blue; } '
        'details .false { color: red; } '
        'details .null { color: gray; } '
        'span.n { color: darkgrey; } '
    )

In [49]:
dtl = DetailsJSON({
    '1': 'Neil Armstrong',
    '2': 'Alan Bean',
    '3': 'Buzz Aldrin',
    'letters': {
        'a': 1, 
        'b': 2
        },
    '5': 'Edgar Mitchell',
    '6': 'Alan Shepard'
}, summary='Apollo astronauts')

# test_eq(val_at(dtl, '5'), 'Edgar Mitchell')
# test_eq(val_at(dtl, 'letters.a'), 1)
# cprint(to_xml(dtl))
show(dtl)

In [50]:
d= {
    "idx": 1,
    "cell_type": "code",
    "source": "# cell 1\nimport time\nfrom itertools import count\n\nimport ipywidgets as W\nimport matplotlib.pyplot as plt\nfrom bridget.helpers import displaydh\nfrom IPython.display import HTML, Image, Javascript, JSON, DisplayHandle, clear_output\ncounter = count()",
    "id": "W1sZmlsZQ==",
    "metadata": {
        "brd": {
            "id": "717322f8-95fa-425c-839d-8b9e7d4ef921"
        }
    },
    "outputs": [
        {'output_type': 'stream', 'name': 'stdout', 'text': '1\n'},
        {'output_type': 'stream', 'name': 'stdout', 'text': '2\n'}
    ],
    "execution_count": 1
}
show(DetailsJSON(d, openall=True))

## HTML
> Convenience `IPython.display.HTML` subclass that accepts `fastcore.xml.FT` object and adds the kwargs to metadata.

In [51]:
#| export

class HTML(IPython.display.HTML):
    def __init__(self, data=None, url=None, filename=None, metadata=None, **kwargs):
        if kwargs:
            if not metadata: metadata = kwargs
            else: metadata.update(kwargs)
        if isinstance(data, FT) or hasattr(data, '__ft__'): data = to_xml(data)
        elif hasattr(data, 'to_html'): data = data.to_html()  # type: ignore
        super().__init__(data, url, filename, metadata)

# Environment Detection

## IN_VSCODE

In [52]:
#| export

def in_vscode():
    "Check if the code is running in VSCode"
    return 'vscode' in sys.modules
def in_vscode_notebook(glbs):
    "Check if the code is running in VSCode"
    return '__vsc_ipynb_file__' in glbs

IN_VSCODE = in_vscode()

In [53]:
IN_VSCODE, in_vscode_notebook(globals())

(True, True)

# Colophon
----

In [54]:
import fastcore.all as FC
import nbdev
from nbdev.clean import nbdev_clean

In [55]:
if FC.IN_NOTEBOOK:
    nb_path = '01_helpers.ipynb'
    # nbdev_clean(nb_path)
    nbdev.nbdev_export(nb_path)