In [None]:
#| default_exp basic

In [None]:
#| export
from __future__ import annotations

# basic
> basic helpers

# Prologue


In [None]:
#| export
import importlib
import operator
import os
import pprint
import re
import sys
from binascii import hexlify
from inspect import Parameter
from pathlib import Path
from types import ModuleType
from typing import Any
from typing import Callable
from typing import DefaultDict
from typing import Hashable
from typing import Iterable
from typing import Literal
from typing import Mapping
from typing import MutableMapping
from typing import Self
from typing import Sequence
from typing import Type
from typing import TypeAlias
from typing import TypeVar

import fastcore.all as FC


In [None]:
import dataclasses
import json
from functools import reduce
from pathlib import Path
from types import NoneType
from types import SimpleNamespace
from typing import TypeVar

from fastcore.test import *

In [None]:
from olio.test import *

----

# empty

In [None]:
#| exporti

class Empty(type): __repr__ = __str__ = lambda self: 'empty'
class EmptyT(metaclass=Empty):...

# _EMPTY: TypeAlias = Parameter.empty
# EmptyT = Type[_EMPTY]

In [None]:
#| export

empty = EmptyT

def is_empty(x) -> bool: return x is empty

In [None]:
test_is(str(empty), 'empty')
empty

empty

In [None]:
def test_empty() -> Empty: return empty
test_eq_type(test_empty(), empty)

# AD

In [None]:
#| export

_VT = TypeVar('_VT')
# from `fastcore` + generics
class AD(dict[str, _VT]):
    "`dict` subclass that also provides access to keys as attrs"
    def __getattr__(self, k:str) -> _VT: return self[k] if k in self else FC.stop(AttributeError(k))  # type: ignore
    def __setattr__(self, k, v:_VT): (self.__setitem__, super().__setattr__)[k[0]=='_'](k,v)
    def __dir__(self) -> Iterable[str]: return super().__dir__() + list(self.keys())  # type: ignore
    def _repr_markdown_(self): return f'```json\n{pprint.pformat(self, indent=2)}\n```'
    def copy(self) -> Self: return type(self)(**self)

In [None]:
ad = AD()
test_fail(lambda: ad.a)
test_fail(lambda: ad['b'])

ad = AD(a=1, b=2)
test_eq(ad.a, 1)

ad.b = 3
test_eq(ad.b, 3)
ad.update(a=4, b=5)
test_eq(ad.b, 5)

# is_listy
> Test whether `x` is iterable but not a string or bytes

In [None]:
#| export

def is_listy(x):
    return isinstance(x, Iterable) and not isinstance(x, (bytes, str))

def is_listy_type(x):
    return issubclass(x, Iterable) and not issubclass(x, (bytes, str))

In [None]:
for _ in (
    [1, 2, 3], (1, 2, 3), {1, 2, 3}, 
    {'a': 1}, range(3), (i for i in range(3)), FC.L(1, 2, 3),
    ):
    test_is(is_listy(_), True)
for _ in (
    'a', b'a', r'a', None,
    ):
    test_is_not(is_listy(_), True)

for _ in (list, tuple, set, dict, range, type(i for i in range(3)), FC.L):
    test_is(is_listy_type(_), True)
for _ in (str, bytes, type(r'a'), NoneType,):
    test_is_not(is_listy_type(_), True)

# flatten


In [None]:
#| export

def flatten(o):
    "Concatenate all collections and items as a generator"
    for item in o:
        if not is_listy(item): yield item; continue
        try: yield from flatten(item)
        except TypeError: yield item

In [None]:
for _, expected in (
    ([], []),  
    ([1, 2, 3], (1, 2, 3)), 
    ((1, (2, 3)), (1, 2, 3)), 
    ([('a', (2,)), ('c', (4,))], ('a', 2, 'c', 4)), 
):
    test_eq(flatten(_), expected)

# shorten
> truncate string

In [None]:
#| export

def shorten(x:Any, mode:Literal['l', 'r', 'c']='l', limit=40, trunc='…', empty='') -> str:
    if len(s := str(x)) > limit:
        l, m, r = (
            (empty, trunc, s[-limit:]) if mode == 'l' else 
            (s[:limit], trunc, empty) if mode == 'r' else 
            (s[:(limit//2)-1], f" {trunc} ", s[-(limit//2-1):])
        )
        s = f'{l}{m}{r}'
    return s

def shortens(xs:Iterable[Any], mode:Literal['l', 'r', 'c']='l', limit=40, trunc='…', empty=''):
    for x in xs: yield shorten(x, mode, limit, trunc, empty)


In [None]:
test_eq(f"{shorten(0)}", '0')
test_eq(f"{shorten(234)}", '234')
test_eq(f"{shorten('asdfgh')}", 'asdfgh')
test_eq(f"{shorten('It was the best of times', limit=12)}", '…est of times')
test_eq(f"{shorten('ad2663b4-5ff5-40e3-a6ed-cc35f5627f8d', limit=17)}", '…a6ed-cc35f5627f8d')
test_eq(f"{shorten('ad2663b4-5ff5-40e3-a6ed-cc35f5627f8d', 'r', limit=17)}", 'ad2663b4-5ff5-40e…')
test_eq(f"{shorten('ad2663b4-5ff5-40e3-a6ed-cc35f5627f8d', 'c', limit=17)}", 'ad2663b … 5627f8d')

In [None]:
test_eq(list(shortens(['ad2663b4-5ff5-40e3-a6ed-cc35f5627f8d'], 'l', 10)), ['…35f5627f8d'])
test_eq(list(shortens(('abcdef', 'wertyu', 'sd'), 'r', 4)), ['abcd…', 'wert…', 'sd'])
test_eq(''.join(shortens('It was the best of times...'.split(), 'r', 1, '')), 'Iwtbot')

# Runner

In [None]:
#| export

_FuncItem: TypeAlias = Callable | Sequence['_FuncItem']

def Runner(*fns: _FuncItem) -> Callable:
    """Return a function that runs callables `fns` in sequence with same arguments. 
    Only side-effects, no composition."""
    _fns: tuple[Callable, ...] = tuple(FC.flatten(fns))  # type: ignore
    if not _fns: return FC.noop
    if len(_fns) == 1: return _fns[0]
    def _(*args, **kwargs) -> None:
        for f in _fns: f(*args, **kwargs)
    return _

In [None]:
def f1(o,x,y=0): o.r += x+y
def f2(o,x,y=0): o.r += x*y
f3 = lambda o, x,y: print(o.r)

o = SimpleNamespace(r=0)
runner = Runner(f1, f2, f3)
runner(o, 2, 3)  # 11

Runner()(o, 2, 3)
test_eq(o.r, 11)
Runner([f1, f2], f3)(o, 2, 3)
test_eq(o.r, 22)
Runner([f1, [f2, f3]])(o, 2, 3)
test_eq(o.r, 33)

11
22
33


# Object helpers

In [None]:
#| export
def setattrs(dest, src, flds=''):
    "Set `flds` or keys() or dir() attributes from `src` into `dest`"
    g = dict.get if isinstance(src, dict) else getattr
    s = operator.setitem if isinstance(dest, MutableMapping) else setattr
    if flds: flds = re.split(r",\s*", flds)
    elif isinstance(src, dict): flds = src.keys()
    else: flds = (_ for _ in dir(src) if _[0] != '_')
    for fld in flds: s(dest, fld, g(src, fld))

In [None]:
class A: a = 1; b = 2; c = 3
a = A()
setattrs(a, {'b': 22, 'd': 44})
test_eq(a.a, 1)
test_eq(a.b, 22)
test_eq(a.c, 3)
test_eq(a.d, 44)  # type: ignore

# val_at
> Lookup values in json/mappings/sequences using dot notation

In [None]:
def val_at(element, json):
    return reduce(operator.getitem, element.split('.'), json)

j = {"app": {
    "Garden": {
        "Flowers": {
            "Red flower": "Rose",
            "White Flower": "Jasmine",
            "Yellow Flower": "Marigold"
        }
    },
    "Fruits": {
        "Yellow fruit": "Mango",
        "Green fruit": "Guava",
        "White Flower": "groovy"
    },
    "Trees": {
        "label": {
            "Yellow fruit": "Pumpkin",
            "White Flower": "Bogan"
        }
    }
}}

test_eq(val_at('app.Garden.Flowers.White Flower', j), 'Jasmine')

In [None]:
print((dp := 'app.Garden.Flowers'), reduce(operator.getitem, dp.split('.'), j))

app.Garden.Flowers {'Red flower': 'Rose', 'White Flower': 'Jasmine', 'Yellow Flower': 'Marigold'}


In [None]:
def val_at(element, j:str):
    d = json.loads(j)
    return reduce(lambda d, k: d[k] if isinstance(d, Mapping) else d[int(k)], element.split('.'), d)

j2 = {"app": {
    "Garden": {
        "Flowers": {
            "Red flower": "Rose",
            "White Flower": "Jasmine",
            "Yellow Flower": "Marigold"
        }
    },
    "Fruits": {
        "Yellow fruit": ["Mango", {"Banana": ["Canary Island", "Puerto Rico"]}],
        "Green fruit": "Guava",
        "White Flower": "groovy"
    },
    "Trees": {
        "label": {
            "Yellow fruit": "Pumpkin",
            "White Flower": "Bogan"
        }
    }
}}
test_eq(val_at('app.Fruits.Yellow fruit.1.Banana.0', json.dumps(j2)), 'Canary Island')

In [None]:
apollo_astronauts = json.loads(Path('apollo_astronauts.json').read_text())
print((dp := 'Apollo 11.Michael Collins'), reduce(operator.getitem, dp.split('.'), apollo_astronauts))

Apollo 11.Michael Collins {'Experience': 'Pilot on Gemini 10 and Command Module pilot on Apollo 11.', 'Place in history': 'Collins was the first person to perform two EVAs in one mission.', 'Fast fact': 'Collins says his "secret terror" was returning to Earth alone if the surface mission failed.', 'Lunar wisdom': 'I really believe that if the political leaders of the world could see their planet from a distance of 100,000 miles their outlook could be fundamentally changed. That all-important border would be invisible, that noisy argument silenced.'}


In [None]:
_T = TypeVar('_T')
_II = isinstance
def _at(d: Mapping|Sequence, k: str) -> Any: return (
    d[k] if _II(d, Mapping) else 
    d[int(k)] if _II(d, Sequence) and not _II(d, (str, bytes)) else 
    FC.stop(KeyError))  # type: ignore

def val_at(key_path: str, j: Mapping|Sequence|str|bytes|bytearray, default:_T=empty, sep:str='.') -> _T:
    "Return nested value at `key_path` from `j`. Raise if not found or `default` if not `empty`."
    try: return reduce(_at, key_path.split(sep), json.loads(j) if _II(j, (str, bytes, bytearray)) else j)
    except (KeyError, IndexError, ValueError) as e:
        if default is not empty: return default
        raise e

def key_at(key_path: str, j: Mapping|Sequence|str|bytes|bytearray, sep:str='.') -> bool:
    "Return `True` if nested `key_path` exists in `j`."
    try:
        reduce(_at, key_path.split(sep), json.loads(j) if _II(j, (str, bytes, bytearray)) else j)
        return True
    except (KeyError, IndexError):
        return False

In [None]:
FC.test_fail(lambda: val_at(object, {}))  # type: ignore
FC.test_fail(lambda: val_at(object, []))  # type: ignore
FC.test_fail(lambda: val_at(object, object))  # type: ignore
FC.test_fail(lambda: val_at('', {}))
FC.test_fail(lambda: val_at('', []))
FC.test_fail(lambda: val_at('', object))   # type: ignore

FC.test_fail(lambda: val_at('a.b', {'a': 1}))
test_eq(val_at('a.b', {'a': 1}, None), None)

In [None]:
d = [
    {'a': 1, 'b': [2,  3], 'c': {'d':   4}}, 
    {'a': 5, 'b': [6,  7],                 'd': [{'e': 81}, {'e': 82}]}, 
    {'a': 9, 'b': [10, 11], 'c': {'d': 12}}
]

test_fail(lambda: val_at('', d))
test_fail(lambda: val_at(object, d))  # type: ignore

test_fail(lambda: val_at('', object))  # type: ignore

test_eq(val_at('0.a', d), 1)
test_eq(val_at('1.b', d), [6, 7])
test_eq(val_at('2.c', d), {'d': 12})

In [None]:
j2 = {
    "app": {
        "Garden": {
            "Flowers": {
                "Red flower": "Rose",
                "White Flower": "Jasmine",
                "Yellow Flower": "Marigold"
            }
        },
        "Fruits": {
            "Yellow fruit": ["Mango", {"Banana": ["Canary Island", "Puerto Rico"]}],
            "Green fruit": "Guava",
            "White Flower": "groovy"
        },
        "Trees": {
            "label": {
                "Yellow fruit": "Pumpkin",
                "White Flower": "Bogan"
            }
        },
        "Numbers": [1, 2, 3, 4, 5],
        "Boolean": True,
        "Null": None
    }
}

j2_str = json.dumps(j2)

test_eq(val_at('app.Fruits.Yellow fruit.1.Banana.0', j2_str), 'Canary Island')
test_eq(val_at('app.Garden.Flowers.Red flower', j2_str), 'Rose')
test_eq(val_at('app.Numbers.2', j2_str), 3)
test_eq(val_at('app.Boolean', j2_str), True)
test_eq(val_at('app.Null', j2_str), None)
test_fail(lambda: val_at('app.NonExistent', j2_str))
test_fail(lambda: val_at('app.Fruits.Yellow fruit.3', j2_str))
test_is(val_at('app.Fruits.Yellow fruit.3', j2_str, None), None)

test_eq(key_at('app.Garden.Flowers.Red flower', j2_str), True)
test_eq(key_at('app.Numbers.2', j2_str), True)
test_eq(key_at('app.Fruits.Yellow fruit.1.Banana.0', j2_str), True)
test_eq(key_at('app.NonExistent', j2_str), False)
test_eq(key_at('app.Fruits.Yellow fruit.3', j2_str), False)

# vals_at
> Lookup values in json/mappings/sequences using dot notation with wildcards

In [None]:
_E = object()

def vals_at(path, d) -> empty | tuple[empty | object, ...] | object:
    "Return nested values-- or empty|(empty, ...)-- at `path` with wildcards '*' from `d`."
    curr, wc, rest = str(path).partition('*')
    if not wc and not rest:  return o if (o := val_at(curr, d, _E)) is not _E else empty
    o = val_at(curr.rstrip('.'), d, _E) if curr else d
    try: return (tuple(map(lambda x: empty if (res := vals_at(rest.lstrip('.'), x)) is _E else res, o))  # type: ignore
        if rest and o is not _E else o)
    except TypeError: return empty

test_eq(vals_at('a', object), empty)

In [None]:
d = [
    {'a': 1, 'b': [2,  3],  'c': {'d':   4}}, 
    {'a': 5, 'b': [6,  7],                  'd': [{'e': 81}, {'e': 82}]}, 
    {'a': 9, 'b': [10, 11], 'c': {'d': 12}}
]

test_eq(vals_at('', d), empty)
test_eq(vals_at(object, d), empty)
test_eq(vals_at('*', d), (*d,))
test_eq(vals_at(2, d), d[2])
test_eq(vals_at('a', d), empty)
test_eq(vals_at('*.a', []), ())
test_eq(vals_at('*.a.*.b', [{'a': object}]), (empty,))

test_eq(vals_at('*,a', d), (empty, empty, empty))
test_eq(vals_at('*.a', d), (1, 5, 9))
test_eq(vals_at('*.a.*', d), (1,5,9))
test_eq(vals_at('*.b.1', d), (3, 7, 11))
test_eq(vals_at('*.c.d', d), (4, empty, 12))

test_eq(vals_at('*.d.*.e', d), (empty, (81, 82), empty))
test_eq(vals_at('*.d.*.f', d), (empty, (empty, empty), empty))
test_eq(vals_at('1.d.*.e', d), (81, 82))

In [None]:
vals_at('*', d)

[{'a': 1, 'b': [2, 3], 'c': {'d': 4}},
 {'a': 5, 'b': [6, 7], 'd': [{'e': 81}, {'e': 82}]},
 {'a': 9, 'b': [10, 11], 'c': {'d': 12}}]

# val_at, val_atpath

In [None]:
#| export

def val_at(o, attr: str, default: Any=empty, sep='.'):
    "Traverse nested `o` looking for attributes/items specified in dot-separated `attr`."
    if not isinstance(attr, str): raise TypeError(f'{attr=!r} is not a string')
    try:
        for a in attr.split(sep):
            try: o = o[a]
            except (TypeError, KeyError):
                try: o = o[int(a)]
                except (IndexError, TypeError, KeyError, ValueError): o = getattr(o, a)
    except AttributeError as e:
        return default if default is not empty else FC.stop(e)  # type: ignore
    return o

def val_atpath(o, *path: Any,  default: Any=empty):
    "Traverse nested `o` looking for attributes/items specified in `path`."
    try:
        for a in path:
            try: o = o[a]
            except (IndexError, TypeError, KeyError):
                try: o = o[int(a)]
                except (IndexError, TypeError, KeyError, ValueError): o = getattr(o, str(a))
    except (AttributeError, TypeError) as e:
        return default if default is not empty else FC.stop(e)  # type: ignore
    return o

_NF = object()

def has_key(o, attr: str, sep='.') -> bool:
    "Return `True` if nested dot-separated `attr` exists."
    return val_at(o, attr, default=_NF, sep=sep) is not _NF

def has_path(o, *path: Any) -> bool:
    "Return `True` if nested `path` exists."
    return val_atpath(o, path, default=_NF) is not _NF

In [None]:
test_fail(lambda: val_at({}, ''))
test_fail(lambda: val_at([], ''))
test_fail(lambda: val_at(object, ''))
test_fail(lambda: val_at({}, 'a.b'))
test_fail(lambda: val_at([], 'a.b'))
test_fail(lambda: val_at(object, 'a.b')) 

test_fail(lambda: val_at({'a': 1}, 'a.b'))
test_fail(lambda: val_atpath({'a': 1}, 'a', 'b'))
test_eq(val_at({'a': 1}, 'a.b', None), None)

In [None]:
d = [
    {'a': 1, 'b': [2,  3], 'c': {'d':   4}}, 
    {'a': 5, 'b': [6,  7],                 'd': [{'e': 81}, {'e': 82}]}, 
    {'a': 9, 'b': [10, 11], 'c': {'d': 12}}
]

test_fail(lambda: val_at(d, ''))
test_fail(lambda: val_at(d, object))  # type: ignore

test_fail(lambda: val_at(object, ''))

test_eq(val_at(d, '0.a'), 1)
test_eq(val_at(d, '1.b'), [6, 7])
test_eq(val_at(d, '2.c'), {'d': 12})

In [None]:
j2 = {
    "app": {
        "Garden": {
            "Flowers": {
                "Red flower": "Rose",
                "White Flower": "Jasmine",
                "Yellow Flower": "Marigold"
            }
        },
        "Fruits": {
            "Yellow fruit": ["Mango", {"Banana": ["Canary Island", "Puerto Rico"]}],
            "Green fruit": "Guava",
            "White Flower": "groovy"
        },
        "Trees": {
            "label": {
                "Yellow fruit": "Pumpkin",
                "White Flower": "Bogan"
            }
        },
        "Numbers": [1, 2, 3, 4, 5],
        "Boolean": True,
        "Null": None
    }
}

j2_str = j2#json.dumps(j2)

test_eq(val_at(j2_str, 'app.Fruits.Yellow fruit.1.Banana.0'), 'Canary Island')
test_eq(val_at(j2_str, 'app.Garden.Flowers.Red flower'), 'Rose')
test_eq(val_at(j2_str, 'app.Numbers.2'), 3)
test_eq(val_at(j2_str, 'app.Boolean'), True)
test_eq(val_at(j2_str, 'app.Null'), None)
test_fail(lambda: val_at(j2_str, 'app.NonExistent'))
test_fail(lambda: val_at(j2_str, 'app.Fruits.Yellow fruit.3'))
test_is(val_at(j2_str, 'app.Fruits.Yellow fruit.3', None), None)

In [None]:
test_eq(has_key(j2_str, 'app.Garden.Flowers.Red flower'), True)
test_eq(has_key(j2_str, 'app.Numbers.2'), True)
test_eq(has_key(j2_str, 'app.Fruits.Yellow fruit.1.Banana.0'), True)
test_eq(has_key(j2_str, 'app.NonExistent'), False)
test_eq(has_key(j2_str, 'app.Fruits.Yellow fruit.3'), False)

In [None]:
test_fail(lambda:val_at(None, '0'))
test_eq(val_at(None, '0', None), None)
test_fail(lambda:val_atpath(None, 2))
test_eq(val_atpath(None, 2, default=None), None)
test_fail(lambda:val_atpath(None, -1))
test_eq(val_atpath(None, -1, default=None), None)
test_fail(lambda:val_at(None, '0.1'))
test_eq(val_at(None, '0.1', None), None)
test_fail(lambda:val_atpath(None, 4, 'b'))
test_eq(val_atpath(None, 4, 'b', default=None), None)

test_fail(lambda:val_atpath([], 1))
test_eq(val_atpath([], 1, default=None), None)
test_fail(lambda:val_atpath([], 'a', 2))
test_eq(val_atpath([], ('a', 2), default=None), None)

test_fail(lambda:val_atpath({}, 1))
test_eq(val_atpath({}, 1, default=None), None)
test_fail(lambda:val_atpath(object(), '2'))
test_eq(val_at(object(), '0', None), None)
test_fail(lambda:val_at(object(), 'a.1'))
test_eq(val_at(object(), 'a.1', None), None)

In [None]:
o = [1, 2, 3, [4, 5, 6], 7]
test_eq(val_at(o, '0'), 1)
test_eq(val_at(o, '-1'), 7)
test_eq(val_atpath(o, -1), 7)
test_eq(val_atpath(o, 2), 3)
test_eq(val_atpath(o, 3, 2), 6)
test_eq(val_atpath(o, 5, default=None), None)
test_eq(val_at(o, '3.4', None), None)

o = dict(a=1, b=2, c=3, d=dict(e=4, f=5), g=6)
test_eq(val_atpath(o, 'b'), 2)
test_eq(val_atpath(o, 'd', 'f'), 5)
test_eq(val_at(o, 'd.f'), 5)
test_eq(val_at(o, 'd.g', None), None)

(s := json.dumps(['foo', (1,2,3), {'bar': ('baz', None, 1.0, 2)}]))
o = json.loads(s)
test_eq(val_at(o, '0'), 'foo')
test_eq(val_atpath(o, 0), 'foo')
test_eq(val_at(o, '2.bar'), ['baz', None, 1.0, 2])
test_eq(val_atpath(o, 2, 'bar'), ['baz', None, 1.0, 2])
test_eq(val_at(o, '2.bar.3'), 2)
test_eq(val_at(o, '3.bar', None), None)
test_eq(val_atpath(o, 2, 'foo', default=None), None)

@dataclasses.dataclass
class _D:
    a: int
    b: str
    c: float
    d: dict[str, Any]
_d = _D(1, '2', 3.0, {'e': 4, 'f': '5', 'g': 6.0})

o = [0, 1, _d, 'a']
test_eq(val_atpath(o, 1), 1)
test_eq(val_at(o, '3'), 'a')
test_eq(val_atpath(o, 2), _d)
test_eq(val_at(o, '2.c'), 3.0)
test_eq(val_at(o, '2.d.f'), '5')

# vals_at, vals_atpath
> Poor man's JSONPath.

In [None]:
#| export

def _vals_atpath(o, *path: Any, filter_empty=False) -> empty | tuple[empty | object, ...] | object:
    try: 
        idx = path.index('*'); pre, pos = path[:idx], path[idx+1:]
    except ValueError:
        return a if (a := val_atpath(o, *path, default=_NF)) is not _NF else empty
    a = val_atpath(o, *pre, default=_NF) if pre else o
    if not pos: return a
    if a is _NF: return empty
    try: 
        res = tuple(map(lambda x: _vals_atpath(x, *pos, filter_empty=filter_empty), a))  # type: ignore
        if all(x is empty for x in res): return empty
        return tuple(filter(lambda x: x is not empty, res)) if filter_empty else res
    except (AttributeError, TypeError) as e: return empty

def vals_atpath(o, *path: Any, filter_empty=False) -> tuple[Any, ...]:
    "Return nested values-- or empty|(empty, ...)-- at `path` with wildcards '*' from `d`."
    if '*' not in path: return () if (res := val_atpath(o, *path, default=_NF)) is _NF else (res,)
    res = _vals_atpath(o, *path, filter_empty=filter_empty)
    return () if res is empty else res  # type: ignore

def _vals_at(o, path:str, filter_empty=False) -> empty | tuple[empty | object, ...] | object:
    pre, wc, pos = str(path).partition('*')
    if not wc and not pos: return a if (a := val_at(o, pre, _NF)) is not _NF else empty
    a = val_at(o, pre.rstrip('.'), _NF) if pre else o
    if not pos: return a
    if a is _NF: return empty
    try: 
        res = tuple(map(lambda x: _vals_at(x, pos.lstrip('.'), filter_empty=filter_empty), a))  # type: ignore
        if all(x is empty for x in res): return empty
        return tuple(filter(lambda x: x is not empty, res)) if filter_empty else res
    except TypeError: return empty

def vals_at(o, path:str, filter_empty=False) -> tuple[Any, ...]:
    "Return nested values-- or empty|(empty, ...)-- at `path` with wildcards '*' from `o`."
    if '*' not in path: return () if (res := val_at(o, path, _NF)) is _NF else (res,)
    res = _vals_at(o, path, filter_empty=filter_empty)
    return () if res is empty else res  # type: ignore


In [None]:
test_eq(vals_atpath(object, ''), ())
test_eq(vals_atpath(object, 'a'), ())
test_eq(vals_atpath(object, '*', 'a'), ())
test_eq(vals_atpath(object, '*', 'a', '*'), ())

test_eq(vals_atpath(['a', 'b'], 'a'), ())
test_eq(vals_atpath(['a', 'b'], '*'), ('a', 'b'))
test_eq(vals_atpath(['a', 'b'], '*', 'a'), ())

test_eq(vals_atpath(['a', 'b'], 0), ('a',))
test_eq(vals_atpath(['a', 'b'], 2), ())

test_eq(vals_atpath([{'a':1}], 'a'), ())
test_eq(vals_atpath([{'a':1}], '*'), [{'a':1}])
test_eq(vals_atpath([{'a':1}, {'a':2}], '*', 'a'), (1, 2))

In [None]:
test_eq(vals_at(object, ''), ())
test_eq(vals_at(object, 'a'), ())
test_eq(vals_at(object, '*.a'), ())
test_eq(vals_at(object, '*.a.*'), ())

test_eq(vals_at(['a', 'b'], 'a'), ())
test_eq(vals_at(['a', 'b'], '*'), ('a', 'b'))
test_eq(vals_at(['a', 'b'], '*.a'), ())

test_eq(vals_at(['a', 'b'], '0'), ('a',))
test_eq(vals_at(['a', 'b'], '2'), ())

test_eq(vals_at([{'a':1}], 'a'), ())
test_eq(vals_at([{'a':1}], '*'), [{'a':1}])
test_eq(vals_at([{'a':1}, {'a':2}], '*.a'), (1, 2))

In [None]:
d = [
    {'a': 1, 'b': [2,  3],  'c': {'d':   4}}, 
    {'a': 5, 'b': [6,  7],                  'd': [{'e': 81}, {'e': 82}]}, 
    {'a': 9, 'b': [10, 11], 'c': {'d': 12}}
]

test_eq(vals_atpath(d, ''), ())
test_eq(vals_atpath(d, object), ())  # type: ignore
test_eq(vals_atpath(d, '*'), (*d,))
test_eq(vals_atpath(d, '2'), (d[2],))
test_eq(vals_atpath(d, 'a'), ())
test_eq(vals_atpath([], '*', 'a'), ())#())
test_eq(vals_atpath([{'a': object}], '*', 'a', '*', 'b'), ())

test_eq(vals_atpath(d, '*', 'a'), (1, 5, 9))
test_eq(vals_atpath(d, '*', 'a', '*'), (1,5,9))
test_eq(vals_atpath(d, '*', 'b', 1), (3, 7, 11))
test_eq(vals_atpath(d, '*', 'c', 'd'), (4, empty, 12))
test_eq(vals_atpath(d, '*', 'c', 'd', filter_empty=True), (4, 12))

test_eq(vals_atpath(d, '*', 'd', '*', 'e'), (empty, (81, 82), empty))
test_eq(vals_atpath(d, '*', 'd', '*', 'e', filter_empty=True), ((81, 82),))
test_eq(vals_atpath(d, '*', 'd', '*', 'f'), ())
test_eq(vals_atpath(d, 1, 'd', '*', 'e'), (81, 82))

In [None]:
d = [
    {'a': 1, 'b': [2,  3],  'c': {'d':   4}}, 
    {'a': 5, 'b': [6,  7],                  'd': [{'e': 81}, {'e': 82}]}, 
    {'a': 9, 'b': [10, 11], 'c': {'d': 12}}
]

test_eq(vals_at(d, ''), ())
test_fail(lambda: vals_at(d, object))  # type: ignore
test_eq(vals_at(d, '*'), (*d,))
test_eq(vals_at(d, '2'), (d[2],))
test_eq(vals_at(d, 'a'), ())
test_eq(vals_at([], '*.a'), ())#())
test_eq(vals_at([{'a': object}], '*.a.*.b'), ())

test_eq(vals_at(d, '*,a'), ())
test_eq(vals_at(d, '*.a'), (1, 5, 9))
test_eq(vals_at(d, '*.a.*'), (1,5,9))
test_eq(vals_at(d, '*.b.1'), (3, 7, 11))
test_eq(vals_at(d, '*.c.d'), (4, empty, 12))
test_eq(vals_at(d, '*.c.d', filter_empty=True), (4, 12))

test_eq(vals_at(d, '*.d.*.e'), (empty, (81, 82), empty))
test_eq(vals_at(d, '*.d.*.e', filter_empty=True), ((81, 82),))
test_eq(vals_at(d, '*.d.*.f'), ())
test_eq(vals_at(d, '1.d.*.e'), (81, 82))

# deep_in

In [None]:
def deep_in(o:Mapping|Iterable, val):
    "return True if val is in nested collections"
    if isinstance(o, Mapping):
        return val in o.values() or any(deep_in(v, val) for v in o.values() if isinstance(v, (Mapping, Iterable)))
    elif isinstance(o, Iterable):
        return val in o or any(deep_in(v, val) for v in o if isinstance(v, (Mapping, Iterable)))
    raise ValueError(f"deep_in: o must be a Mapping or Iterable, got {type(o)}")

In [None]:
test_fail(lambda:deep_in(None, 1))  # type: ignore
test_eq(deep_in((), 1), False)
test_eq(deep_in((1,), 1), True)
test_eq(deep_in((1,), 2), False)
test_eq(deep_in({'a': 1}, 1), True)
test_eq(deep_in({'a': 1}, 2), False)
test_eq(deep_in({'a': (1,)}, 1), True)
test_eq(deep_in({'a': (1,)}, 2), False)
test_eq(deep_in({'a': {'b': 1}}, 1), True)
test_eq(deep_in({'a': {'b': 1}}, 2), False)
test_eq(deep_in({'a': {'b': (1,)}}, 1), True)
test_eq(deep_in({'a': {'b': (1,)}}, 2), False)

In [None]:
def deep_in_v1(o:Mapping|Iterable, val):
    "return True if val is in o"
    if isinstance(o, Mapping):
        for v in o.values():
            if v == val or (is_listy(v) and deep_in_v1(v, val)): return True
        return False
    elif isinstance(o, Iterable):
        if val in o: return True
        return any(deep_in_v1(v, val) for v in o if is_listy(v))
    raise ValueError(f"deep_in: o must be a Mapping or Iterable, got {type(o)}")

In [None]:
def deep_in_v2(o:Mapping|Iterable, val):
    from collections import deque
    stack = deque([o])
    while stack:
        curr = stack.popleft()
        if isinstance(curr, Mapping):
            if val in curr.values(): return True
            stack.extend(v for v in curr.values() if is_listy(v))
        elif isinstance(curr, Iterable):
            if val in curr: return True  
            stack.extend(v for v in curr if is_listy(v))
        else:
            raise ValueError(f"deep_in: o must be a Mapping or Iterable, got {type(curr)}")
    return False

In [None]:
def deep_in_v3(o:Mapping|Iterable, val):
    def _search(obj):
        if isinstance(obj, Mapping):
            yield from obj.values()
            for v in obj.values():
                if is_listy(v): yield from _search(v)
        elif isinstance(obj, Iterable):
            yield from obj
            for v in obj:
                if is_listy(v): yield from _search(v)
        else:
            raise ValueError(f"deep_in: o must be a Mapping or Iterable, got {type(obj)}")
    
    return val in _search(o)

In [None]:
test_data = [
    ((), 1), ((1,), 1), ((1,), 2), ({'a': 1}, 1), ({'a': 1}, 2),  
    ({'a': (1,)}, 1), ({'a': (1,)}, 2), ({'a': {'b': 1}}, 1), 
    ({'a': {'b': 1}}, 2), ({'a': {'b': (1,)}}, 1), ({'a': {'b': (1,)}}, 2)
]

for o, val in test_data:
    orig = deep_in(o, val)
    test_eq(deep_in_v1(o, val), orig)
    test_eq(deep_in_v2(o, val), orig) 
    test_eq(deep_in_v3(o, val), orig)

In [None]:
import time

deep_data = {'level1': {'level2': {'level3': {'level4': [1, 2, 3, {'deep': 'target'}]}}}}

def time_function(func, data, target, iterations=10000):
    start = time.time()
    for _ in range(iterations):
        func(data, target)
    return time.time() - start

print("\nPerformance comparison (10000 iterations):")
target = 'target'
times = {}
times['original'] = time_function(deep_in, deep_data, target)
times['v1_single_iter'] = time_function(deep_in_v1, deep_data, target)
times['v2_stack_based'] = time_function(deep_in_v2, deep_data, target)
times['v3_generator'] = time_function(deep_in_v3, deep_data, target)

for name, t in times.items():
    speedup = times['original'] / t if t > 0 else float('inf')
    print(f"{name:15}: {t:.4f}s (speedup: {speedup:.2f}x)")


Performance comparison (10000 iterations):
original       : 0.0902s (speedup: 1.00x)
v1_single_iter : 0.0568s (speedup: 1.59x)
v2_stack_based : 0.0729s (speedup: 1.24x)
v3_generator   : 0.0779s (speedup: 1.16x)


In [None]:
#| export
def deep_in(o:Mapping|Iterable, val):
    "return True if val is in nested collections"
    if isinstance(o, Mapping):
        for v in o.values():
            if v == val or (is_listy(v) and deep_in(v, val)): return True
        return False
    elif isinstance(o, Iterable):
        if val in o: return True
        return any(deep_in(v, val) for v in o if is_listy(v))
    raise ValueError(f"deep_in: o must be a Mapping or Iterable, got {type(o)}")

# Dict helpers

## pops_

In [None]:
#| export
def pops_(d: dict, *ks: Hashable) -> dict:
    "Pop existing `ks` items from `d` in-place into a dictionary."
    return {k:d.pop(k) for k in ks if k in d}

In [None]:
d = {'a': 1, 'b': 2, 'c': 3}
test_eq((pops_(d, 'a', 'b'), d), ({'a': 1, 'b': 2}, {'c': 3}))
test_eq(pops_({'a': 1, 'b': 2, 'c': 3}, 'd'), {})
test_eq(pops_({'a': 1, 'b': 2, 'c': 3}, 'a', 'c', 'd'), {'a': 1, 'c': 3})
test_eq(pops_({}, 'a'), {})
test_eq(pops_({'a': 1}, 'a', 'a'), {'a': 1})

## pops_values_

In [None]:
#| export
def pops_values_(d: dict, *ks: Hashable) -> tuple:
    "Pop existing `ks` items from `d` in-place into a tuple of values or `Parameter.empty` for missing keys."
    return tuple(d.pop(k, Parameter.empty) for k in ks)

In [None]:
test_eq(pops_values_({'a': 1, 'b': 2, 'c': 3}, 'a', 'b'), (1, 2))
test_eq(pops_values_({'a': 1, 'b': 2, 'c': 3}, 'd'), (Parameter.empty,))
test_eq(pops_values_({'a': 1, 'b': 2, 'c': 3}, 'a', 'c', 'd'), (1, 3, Parameter.empty))
test_eq(pops_values_({}, 'a'), (Parameter.empty,))
test_eq(pops_values_({'a': 1}, 'a', 'a'), (1, Parameter.empty))

## gets

In [None]:
#| export

def gets(d: Mapping, *ks: Hashable):
    "Fetches `ks` values, or `Parameter.empty` for missing keys, from `d` into a tuple."
    return tuple(d.get(k, Parameter.empty) for k in ks)  # type: ignore

In [None]:
test_eq(gets({'a': 1, 'b': 2}, 'a', 'c', 'b'), (1, Parameter.empty, 2))
test_eq(gets({'a': 1, 'b': 2}), ())
a, b = gets({'a': 1, 'b': 2}, 'b', 'a')
test_eq((a, b), (2, 1))

d = {Path('a'): 1, Path('b'): 2}
test_eq(gets(d, Path('a'), Path('c'), Path('b')), (1, Parameter.empty, 2))

## update_

In [None]:
# def update_(d:dict|None=None, /, empty_value=None, **kwargs):
#     "Update `d` in-place with `kwargs` whose values aren't `empty_value`"
#     d = d if d is not None else {}
#     for k, v in kwargs.items():
#         if v is not empty_value: d[k] = v
#     return d

In [None]:
#| export

def update_(dest=None, /, empty_value=None, **kwargs) -> Any:
    "Update `dest` in-place with `kwargs` whose values aren't `empty_value`"
    dest = dest if dest is not None else {}
    f = operator.setitem if isinstance(dest, MutableMapping) else setattr
    for k, v in filter(lambda x: x[1]!=empty_value, kwargs.items()): f(dest, k, v)
    return dest

Helper for conditionally updating dictionaries/namespaces.


In [None]:
d = {'a': 1}
update_(d, b=2, c=None)
test_eq(d, {'a': 1, 'b': 2})

d = {}
update_(d, a=True, b=Parameter.empty, empty_value=Parameter.empty)
test_eq(d, {'a': True})

d = update_(a=1, b=None)
test_eq(d, {'a': 1})

In [None]:
class A: a = 1; b = 2; c = 3
a:A = update_(A(), **{'b': 22, 'd': 44})
test_eq(a.a, 1)
test_eq(a.b, 22)
test_eq(a.c, 3)
test_eq(a.d, 44)  # type: ignore

In [None]:
ad = AD(a=1, b=2)
update_(ad, a=3, b=4, c=5)
test_eq(ad.a, 3)
test_eq(ad.b, 4)
test_eq(ad.c, 5)

# _get_globals

In [None]:
#| exporti

def _get_globals(mod: str):
    if hasattr(sys, '_getframe'):
        glb = sys._getframe(2).f_globals
    else:
        glb = sys.modules[mod].__dict__
    return glb

In [None]:
def _gtest(): return _get_globals(__name__)
g1 = _gtest()
g2 = globals()
test_eq(g1, g2)

# Bundle path

In [None]:
#| export

def bundle_path(mod:str|ModuleType):
    "Return the path to the module's directory or current directory."
    if isinstance(mod, str): mod = importlib.import_module(mod)
    return Path(fn).parent if (fn := getattr(mod, '__file__', None)) else Path()

In [None]:
import olio

test_eq(bundle_path(__name__), Path('.'))
test_eq(bundle_path('olio').resolve(), Path(olio.__file__).parent)

# Kounter
> Counter of keys

In [None]:
#| export

class Kounter:
    def __init__(self): self.d = DefaultDict(int)
    def __call__(self, k): d = self.d; d[k] += 1; return self.d[k]

In [None]:
kounter = Kounter()
cntr = Kounter()
cntr('a')
cntr('b')
cntr('a')
cntr('a')
cntr('b')
cntr('b')
cntr('b')
test_eq(cntr.d, {'a': 3, 'b': 4})
test_eq(cntr('int'), 1)

# id_gen
> Generate session IDs

In [None]:
import random
import re
from pathlib import Path

lines = Path("static/wordlist.txt").read_text().splitlines()
words = [line.strip() for line in lines if line.isalpha()]

In [None]:
def modify_word(word):
    # Randomly capitalize the first or second letter
    if len(word) > 1:
        idx_to_capitalize = random.choice([0, 1])
        word = word[:idx_to_capitalize] + word[idx_to_capitalize].upper() + word[idx_to_capitalize + 1:]
    else:
        word = word.upper()  # If single letter, capitalize it
    
    # Randomly add a number (0–99) at the start or end
    if random.choice([True, False]):
        number = random.randint(0, 99)
        # if random.choice([True, False]):
        #     word = f"{number}{word}"  # Number at the start
        # else:
        word = f"{word}{number}"  # Number at the end
    
    return word

def generate_readable_id(num_words=3):
    words_part = [modify_word(random.choice(words)) for _ in range(num_words)]
    id_candidate = '-'.join(words_part)

    # Ensure it's a valid CSS identifier
    if not re.match(r"^[a-zA-Z_][\w\-]*$", id_candidate):  # Add '_' if invalid
        id_candidate = f"_{id_candidate}"
    
    return f"{id_candidate}-{random.randint(0, 9999)}"

In [None]:
generate_readable_id(), generate_readable_id()

('lYing9-Partial-Deeply94-291', 'Independent16-aRrives85-cOmmented30-8284')

In [None]:
#| export

def simple_id():
    return 'b'+hexlify(os.urandom(16), '-', 4).decode('ascii')

def id_gen():
    kntr = Kounter()
    def _(o:Any=None): 
        if o is None: return simple_id()
        # return f"{type(o).__name__}_{hash(o) if isinstance(o, Hashable) else kntr(type(o).__name__)}"
        return f"{type(o).__name__}_{kntr(type(o).__name__)}"
    return _

The `id_gen` function creates a function that takes any object and generates an unique Id valid during the current session. Useful for creating unique element IDs in dynamic HTML content.


In [None]:
new_id = id_gen()
new_id(), new_id()

('b4fc95147-9d27295a-deb0cbc0-609281e7',
 'b518a0dc0-d0751c8e-31b27b66-5a3a2853')

In [None]:
int_id = id_gen()
int_id(7), int_id(8)

('int_1', 'int_2')

In [None]:
obj_id = id_gen()
o1, o2 = object(), object()
print(obj_id(o1), obj_id(o2))

dict_id = id_gen()
print(dict_id(d1 := {'a': 1}), dict_id(d2 := {'a': 1}))

pth_id = id_gen()
print(pth_id(Path('.')), pth_id(Path()), pth_id(Path('./bin')))

object_1 object_2
dict_1 dict_2
PosixPath_1 PosixPath_2 PosixPath_3


## WithCounterMeta

In [None]:
#| export

class WithCounterMeta(FC.FixSigMeta):
    "Adds a `_cnt_` attribute to its classes and increments it for each new instance."
    _cnt_: int
    def __call__(cls, *args, **kwargs):
        res = super().__call__(*args, **kwargs)
        res._cnt_ = cls._cnt_
        cls._cnt_ += 1
        return res
    def __new__(cls, name, bases, dict):
        res = super().__new__(cls, name, bases, dict)
        res._cnt_ = 0
        return res

In [None]:
class _T1(metaclass=WithCounterMeta): pass
class _T2(metaclass=WithCounterMeta): pass

test_is(_T1._cnt_, 0)
test_is(_T2._cnt_, 0)

test_is(_T1()._cnt_, 0)
test_is(_T1._cnt_, 1)
test_is(_T2._cnt_, 0)

test_is(_T1()._cnt_, 1)
test_is(_T2()._cnt_, 0)
test_eq(_T1._cnt_, 2)
test_eq(_T2._cnt_, 1)

# Colophon
----

In [None]:
#|hide
#|eval: false
import fastcore.all as FC
import nbdev
from nbdev.clean import nbdev_clean

In [None]:
#|hide
#|eval: false

if FC.IN_NOTEBOOK:
    nb_path = '00_basic.ipynb'
    nbdev_clean(nb_path)
    nbdev.nbdev_export(nb_path)