In [None]:
%%writefile src/stay/stay.py

from shlex import split
from enum import Enum
from collections.abc import Iterable
from typing import Union, Dict, List
from dataclasses import asdict, dataclass, is_dataclass

T = Enum("Token", "start key comment long list")
D = Enum("Directive", "")

class ParsingError(Exception):
    pass

class StateMachine:
    def __init__(self, *, states:dict, initial):       
        self.states = {}
        self.states.update(states)
        self.state = initial
        self.previous = initial
    
    def flux_to(self, to_state):
        try:
            if to_state in self.states[self.state]:                
                self.previous = self.state
                self.state = to_state
                return True
            else:
                return False
        except KeyError:
            return False
            
    def __call__(self):
        return self.state

def load(file):
    for x in loads(file.readlines()):
        yield x

        

def do_long(n, l, parser, current_value):
    if l.startswith(":::"):
        current[current_key] = "\n".join(current_value)
        parser.flux_to(parser.previous)
    else:
        # to escape ::: in a long value, everything else already is escaped
        if l.startswith("\:::"):
            l = l[1:]
        current_value.append(l.rstrip('\n'))
        
def loads(text: List[str], spaces_per_indent=4):   
    Parser = StateMachine(states={T.start:{T.long, T.start, T.key, T.comment, T.list},
                                  T.key: {T.long, T.key, T.comment, T.list},
                                  T.long: {T.long, T.start, T.key, T.comment, T.list},
                                  T.list: {T.long, T.start, T.key, T.comment, T.list},
                                  T.comment: {T.long, T.key, T.comment, T.start, T.list},
                                 },
                        initial=T.start)
    
    current = {}
    stack = []
    current_value = []
    current_key = None
    directives = set()

    def level(l):
        l = l.expandtabs(tabsize=spaces_per_indent)
        return (len(l) - len(l.lstrip()))//spaces_per_indent
    
    for n, l in enumerate(text):                
        # long values escape everything, even empty lines
        if (l.isspace() or not l) and Parser() is not T.long:
            continue
            
        # a short comment
        if l.startswith("#"):
            # long values escape comments
            if Parser() is T.long:
                current_value.append(l)
                continue
                
            if l.startswith("###"):
                # we may have a single "### heading ###"
                if len(l.split()) > 2 and l.endswith("###"):
                    continue
                elif Parser() is not T.comment:
                    Parser.flux_to(T.comment)
                else:
                    Parser.flux_to(Parser.previous)
            continue
        
        if Parser() is T.comment:
            continue
        
        if Parser() is T.long:
            if l.startswith(":::"):
                current[current_key] = "\n".join(current_value)
                Parser.flux_to(Parser.previous)
            else:
                # to escape ::: in a long value, everything else already is escaped
                if l.startswith("\:::"):
                    l = l[1:]
                current_value.append(l.rstrip('\n'))
            continue
        
        if Parser() is T.list:               
            if l.startswith("]"):
                current[current_key] = current_value
                Parser.flux_to(Parser.previous)
                continue
            
            if l.startswith("\]"):
                    l = l[1:]
            
            l = l.strip()

            # like a matrix
            if l.startswith("[") and l.endswith("]"):
                l = l[1:-1]
                l = split(l)

            current_value.append(l)
            continue
             
        # one might use more than 3 for aesthetics
        if l.startswith("===") or l.startswith("---"):
            Parser.flux_to(T.start)
            yield current
            current = {}
            continue
        
        if l.startswith("%"):
            D = split(l[1:])
            for x in D:
                if x.startswith("+"):
                    try:
                        d = getattr(D, x[1:])
                        directives.add(d)
                    except AttributeError:
                        raise ParsingError(f"No such directive: {x} (line {n})")
                elif d.startswith("-"):
                    try:
                        d = getattr(D, x[1:])
                        directives.discard(d)
                    except AttributeError:
                        raise ParsingError(f"No such directive: {x} (line {n})")
            continue
        
        k, _, v = l.partition(":")
        k, v = k.strip(), v.strip()
        
        if v == "::":
            Parser.flux_to(T.long)
            current_value = []
            current_key = k.strip()
            continue
        
        if v == "::[":
            Parser.flux_to(T.list)
            current_value = []
            current_key = k.strip()
            continue
        
        for x in range(abs(level(l) - len(stack))):
                prev, prev_k = stack.pop()
                prev[prev_k] = current
                current = prev
        
        if v == "":
            stack.append((current, k))
            current = {}
        else:
            # this implements a list of values, just use "[1 2 3 'foo bar' baz]" to get [1,2,3, "foo bar", baz]
            if v.startswith("[") and v.endswith("]"):
                v = v[1:-1]
                v = split(v)
            
            # simple values
            current[k] = v
    
    for _ in range(len(stack)):
        prev, prev_k = stack.pop()
        prev[prev_k] = current
        current = prev

    yield current
    
def __process(D:dict, level=0, spaces_per_indent=4):
    def do(k, v):
        if not isinstance(v, Iterable) or (isinstance(v, str) and "\n" not in v):
            l = f"{' ' * level * spaces_per_indent}{k}: {v}\n"

        elif isinstance(v, str) and "\n" in v:
            l = f"{' ' * level * spaces_per_indent}{k}:::\n{v}\n:::\n"

        elif isinstance(v, Iterable) and not isinstance(v, dict):
            l = f"{' ' * level * spaces_per_indent}{k}: [{' '.join(str(x) for x in v)}]\n"

        elif isinstance(v, dict):
            l = f"{' ' * level * spaces_per_indent}{k}:\n"
            for k, v in v.items():
                l += '\n'.join(str(x) for x in __process(k, v, level=level+1))
        else:
            raise UserWarning
        return l
    
    text = ''.join(do(k, v) for k, v in D.items())
    return text

def dumps(it:Union[Iterable, Dict, dataclass], spaces_per_indent=4):
    """Process an iterator of dictionaries as STAY documents, without comments."""
    it = [it] if isinstance(it, dict) else it
    it = [asdict(it)] if is_dataclass(it) else it
    
    output = "===\n".join(__process(asdict(D) if is_dataclass(D) else D) for D in it)
    return output

In [None]:
from shlex import split
from enum import Enum
from collections.abc import Iterable
from typing import Union, Dict, List
from dataclasses import asdict, dataclass, is_dataclass

T = Enum("Token", "start key comment long list")
D = Enum("Directive", "")

class ParsingError(Exception):
    pass

class StateMachine:
    def __init__(self, *, states:dict, initial):       
        self.states = {}
        self.states.update(states)
        self.state = initial
        self.previous = initial
    
    def flux_to(self, to_state):
        try:
            if to_state in self.states[self.state]:                
                self.previous = self.state
                self.state = to_state
                return True
            else:
                return False
        except KeyError:
            return False
            
    def __call__(self):
        return self.state

def load(file):
    for x in loads(file.readlines()):
        yield x

def loads(text: List[str], spaces_per_indent=4):   
    Parser = StateMachine(states={T.start:{T.long, T.start, T.key, T.comment, T.list},
                                  T.key: {T.long, T.key, T.comment, T.list},
                                  T.long: {T.long, T.start, T.key, T.comment, T.list},
                                  T.list: {T.long, T.start, T.key, T.comment, T.list},
                                  T.comment: {T.long, T.key, T.comment, T.start, T.list},
                                 },
                        initial=T.start)
    
    current = {}
    stack = []
    current_value = []
    current_key = None
    directives = set()

    def level(l):
        l = l.expandtabs(tabsize=spaces_per_indent)
        return (len(l) - len(l.lstrip()))//spaces_per_indent
    
    for n, l in enumerate(text):                
        # long values escape everything, even empty lines
        if (l.isspace() or not l) and Parser() is not T.long:
            continue
            
        # a short comment
        if l.startswith("#"):
            # long values escape comments
            if Parser() is T.long:
                current_value.append(l)
                continue
                
            if l.startswith("###"):
                # we may have a single "### heading ###"
                if len(l.split()) > 2 and l.endswith("###"):
                    continue
                elif Parser() is not T.comment:
                    Parser.flux_to(T.comment)
                else:
                    Parser.flux_to(Parser.previous)
            continue
        
        if Parser() is T.comment:
            continue
        
        if Parser() is T.long:
            if l.startswith(":::"):
                current[current_key] = "\n".join(current_value)
                Parser.flux_to(Parser.previous)
            else:
                # to escape ::: in a long value, everything else already is escaped
                if l.startswith("\:::"):
                    l = l[1:]
                current_value.append(l.rstrip('\n'))
            continue
        
        if Parser() is T.list:               
            if l.startswith("]:::"):
                current[current_key] = current_value
                Parser.flux_to(Parser.previous)
                continue
            
            if l.startswith("\]:::"):
                    l = l[1:]
            
            l = l.strip()

            # like a matrix
            if l.startswith("[") and l.endswith("]"):
                l = l[1:-1]
                l = split(l)

            current_value.append(l)
            continue
             
        # one might use more than 3 for aesthetics
        if l.startswith("===") or l.startswith("---"):
            Parser.flux_to(T.start)
            yield current
            current = {}
            continue
        
        if l.startswith("%"):
            D = split(l[1:])
            for x in D:
                if x.startswith("+"):
                    try:
                        d = getattr(D, x[1:])
                        directives.add(d)
                    except AttributeError:
                        raise ParsingError(f"No such directive: {x} (line {n})")
                elif d.startswith("-"):
                    try:
                        d = getattr(D, x[1:])
                        directives.discard(d)
                    except AttributeError:
                        raise ParsingError(f"No such directive: {x} (line {n})")
            continue
        
        k, _, v = l.partition(":")
        k, v = k.strip(), v.strip()
        
        if v == "::":
            Parser.flux_to(T.long)
            current_value = []
            current_key = k.strip()
            continue
        
        if v == "::[":
            Parser.flux_to(T.list)
            current_value = []
            current_key = k.strip()
            continue
        
        for x in range(abs(level(l) - len(stack))):
                prev, prev_k = stack.pop()
                prev[prev_k] = current
                current = prev
        
        if v == "":
            stack.append((current, k))
            current = {}
        else:
            # this implements a list of values, just use "[1 2 3 'foo bar' baz]" to get [1,2,3, "foo bar", baz]
            if v.startswith("[") and v.endswith("]"):
                v = v[1:-1]
                v = split(v)
            
            # simple values
            current[k] = v
    
    for _ in range(len(stack)):
        prev, prev_k = stack.pop()
        prev[prev_k] = current
        current = prev

    yield current
    
def __process(D:dict, level=0, spaces_per_indent=4):
    def do(k, v):
        if not isinstance(v, Iterable) or (isinstance(v, str) and "\n" not in v):
            l = f"{' ' * level * spaces_per_indent}{k}: {v}\n"

        elif isinstance(v, str) and "\n" in v:
            l = f"{' ' * level * spaces_per_indent}{k}:::\n{v}\n:::\n"

        elif isinstance(v, Iterable) and not isinstance(v, dict):
            l = f"{' ' * level * spaces_per_indent}{k}: [{' '.join(str(x) for x in v)}]\n"

        elif isinstance(v, dict):
            l = f"{' ' * level * spaces_per_indent}{k}:\n"
            for k, v in v.items():
                l += '\n'.join(str(x) for x in __process(k, v, level=level+1))
        else:
            raise UserWarning
        return l
    
    text = ''.join(do(k, v) for k, v in D.items())
    return text

def dumps(it:Union[Iterable, Dict, dataclass], spaces_per_indent=4):
    """Process an iterator of dictionaries as STAY documents, without comments."""
    it = [it] if isinstance(it, dict) else it
    it = [asdict(it)] if is_dataclass(it) else it
    
    output = "===\n".join(__process(asdict(D) if is_dataclass(D) else D) for D in it)
    return output
    
it = [{1:2},{2:3}]
dumps(it)

In [None]:
x = 2

def f(a):
    print(a*2)
    
f(x if x < 2 else 3)

In [1]:
from shlex import split
from enum import Enum
from collections.abc import Iterable
from typing import Union, Dict, List, Sequence
from dataclasses import asdict, dataclass, is_dataclass
from plugins import DRV, directives

T = Enum("Token", "start key comment long list dicts")

__version__ = 459

class ParsingError(Exception):
    pass

@dataclass
class State:
    token = T.start
    previous_token = None

    current = {}
    stack = []
    current_value = []
    current_key = None
    directives = set()

def do_list(n, line, st):
    if line.startswith(":::"):
        st.current[st.current_key] = "\n".join(st.current_value)
        st.token = st.previous_token
    else:
        # to escape ::: in a long value, everything else already is escaped
        if line.startswith(r"\:::"):
            line = line[1:]
        st.current_value.append(line.rstrip('\n'))

def do_long(n, line, st):
    if line.startswith(":::"):
        st.current[st.current_key] = "\n".join(st.current_value)
        st.token = st.previous_token

def do_comment(n, line, st):
    pass

def do_list(n, line, st):
    if line.startswith("]"):
        st.current[st.current_key] = st.current_value
        st.token = st.previous_token
        return
    
    if line.startswith(r"\]"):
            line = line[1:]
    
    line = line.strip()

    # like a matrix, for instance
    if line.startswith("[") and line.endswith("]"):
        line = line[1:-1]
        line = split(line)

    st.current_value.append(line)

def do_dicts(n, line, st):
    if line.startswith("}"):
        st.current[st.current_key] = st.current_value
        st.token = st.previous_token
        return
    
    if line.startswith(r"\}"):
            line = line[1:]
    line = line.strip()
    st.current_value.append(line)


cases = {T.comment: do_comment,
        T.long: do_long,
        T.list: do_list,
        T.dicts: do_dicts,
        }

cases.update(directives)

def load(file):
    for x in loads(file.readlines()):
        yield x

def loads(lines, spaces_per_indent=4) -> Sequence[dict]:
    lines = lines.splitlines() if isinstance(lines, str) else lines

    st = State()

    def level(line):
        line = line.expandtabs(tabsize=spaces_per_indent)
        return (len(line) - len(line.lstrip()))//spaces_per_indent
    
    for n, line in enumerate(lines):
        # long values escape everything, even empty lines
        if (line.isspace() or not line) and st.token is not T.long:
            continue
            
        # a short comment
        if line.startswith("#"):
            # long values escape comments
            if st.token is T.long:
                st.current_value.append(line)
                continue
                
            if line.startswith("###"):
                # we may have a single "### heading ###"
                if len(line.split()) > 2 and line.endswith("###"):
                    continue
                elif st.token is not T.comment:
                    st.previous_token = st.token
                    st.token = T.comment
                else:
                    st.token = st.previous_token
            continue
        
        try:
            print(3, st())
            cases[st()](n, line, st)
            continue
        except:
            pass

        # one might use more than 3 for aesthetics
        if line.startswith("===") or line.startswith("---"):
            if st.token is T.key:
                raise ParsingError(f"Key {st.current_key} but no value given.")
            else:
                st.token = T.start
            yield st.current
            st.current = {}
            continue
        
        if line.startswith("%"):
            D = split(line[1:])
            for x in D:
                if x.startswith("+"):
                    try:
                        d = getattr(D, x[1:])
                        directives.add(d)
                    except AttributeError:
                        raise ParsingError(f"No such directive: {x} (line {n})")
                elif d.startswith("-"):
                    try:
                        d = getattr(D, x[1:])
                        directives.discard(d)
                    except AttributeError:
                        raise ParsingError(f"No such directive: {x} (line {n})")
            continue
        

        k, _, v = line.partition(":")
        k = k.strip()
        # need to add a leading " if spaces must not be ignored
        if v.startswith('"'):
            v = v[1::]
        else:
            v = v.strip()
        
        if v == "::":
            st.previous_token = st.token
            st.token = T.long
            st.current_value = []
            st.current_key = k.strip()
            continue
        
        if v == "::[":
            st.previous_token = st.token
            st.token = T.list
            st.current_value = []
            st.current_key = k.strip()
            continue

        if v == "::{":
            st.previous_token = st.token
            st.token = T.dicts
            st.current_value = {}
            st.current_key = k.strip()
            continue
        
        for x in range(abs(level(line) - len(st.stack))):
                prev, prev_k = st.stack.pop()
                prev[prev_k] = st.current
                st.current = prev
        
        if v == "":
            st.stack.append((st.current, k))
            st.current = {}
        else:
            # this implements a list of values, just use "[1 2 3 'foo bar' baz]" to get [1,2,3, "foo bar", baz]
            if v.startswith("[") and v.endswith("]"):
                v = v[1:-1]
                v = split(v)
            
            # simple values
            st.current[k] = v
    
    for _ in range(len(st.stack)):
        prev, prev_k = st.stack.pop()
        prev[prev_k] = st.current
        st.current = prev

    yield st.current
    
def __process(D:dict, level=0, spaces_per_indent=4):
    def do(k, v):
        if not isinstance(v, Iterable) or (isinstance(v, str) and "\n" not in v):
            line = f"{' ' * level * spaces_per_indent}{k}: {v}\n"

        elif isinstance(v, str) and "\n" in v:
            line = f"{' ' * level * spaces_per_indent}{k}:::\n{v}\n:::\n"

        elif isinstance(v, Iterable) and not isinstance(v, dict):
            line = f"{' ' * level * spaces_per_indent}{k}: [{' '.join(str(x) for x in v)}]\n"

        elif isinstance(v, dict):
            line = f"{' ' * level * spaces_per_indent}{k}:\n"
            for k, v in v.items():
                line += '\n'.join(str(x) for x in __process(k, v, level=level+1))
        else:
            raise UserWarning
        return line
    
    text = ''.join(do(k, v) for k, v in D.items())
    return text

def dumps(it:Union[Iterable, Dict, dataclass], spaces_per_indent=4):
    """Process an iterator of dictionaries as STAY documents, without comments.
    On second thought, it would be cool to auto-add comments, making the file self-documenting.
    """
    it = [it] if isinstance(it, dict) else it
    it = [asdict(it)] if is_dataclass(it) else it
    
    output = "===\n".join(__process(asdict(D) if is_dataclass(D) else D) for D in it)
    return output



ModuleNotFoundError: No module named 'plugins'

In [None]:
f = lambda n, l: None

f(3,4)

In [6]:
s = "foo bar"

def parse(s):
    rest = s
    x = y = -1
    result = []
    
    def no_brackets():
        nonlocal rest
        _ = rest
        rest = ""
        return _

    
    def open_bracket():
        pass
    
    def close_bracket():
        pass
    
    def find_brackets():
        nonlocal x, y
        x, y = rest.find("["), rest.find("]")
        if x == y == -1:
            no_brackets()
        x = x if x != -1 else inf
        y = y if y != -1 else inf
        
        if x < y:
            open_bracket()
        if x > y:
            close_bracket()
    
    while len(rest) > 0:
        result.append(find_brackets())
    return result
        
parse(s)

['foo bar']

In [20]:
from math import inf

In [11]:
str.find?

In [4]:
t = """

x:::[
[1 2 3]
[2 3 4]
]
"""

list(loads(t))

NameError: name 'loads' is not defined

In [None]:
listoflists:::[
1 2 3
[foo bar]
foo bar
[4 5 6]
foo bar [7 8 9 [ß]] adf
]
["foo", "bar"]
"foo bar"

[["1 2 3",["4","5","6"], "foo bar", ["7","8","9", ["ß"]] "adf"]

In [None]:
%%writefile plugin_test.py

def 

In [None]:
%%writefile stay_plugin_test.py



In [None]:
def f(x):
    return x*3

def g(x):
    return x*5

def h(x):
    return x*7

cases = {frozenset()}

In [4]:
def f(x):
    if x > 2 and x <7:
        return x
    
def g(x):
    cases = {frozenset(list(range(2,7))): lambda: x}
    cases[x]()
    
g(3)

KeyError: 3

In [6]:
from enum import Enum

DRV = Enum("DRV", "key_CAP")

text = "% +key_CAP\nhallo: Sarah"
for l in text.splitlines():
    print(l)

DRV["key_CAP"]

% +key_CAP
hallo: Sarah


<DRV.key_CAP: 1>

In [1]:
"%:key_cap:".split(":")

['%', 'key_cap', '']

In [26]:
PRE_DRV = Enum("preline directive", "CAP")

PRE_DRV

<enum 'preline directive'>

In [25]:
def CAP():
    def do(line):
        return line.upper() 
    return do

def mul(num):
    def do(line):
        return line * num
    return do

PRE_DRV = Enum("preline directive", "CAP")

directives = {"CAP": CAP(), "MUL": mul(3)}
s = "asdf"
result = []
current = s
for d,f in directives.items():
    current = f(current)
    result.append((d, current))
result

[('CAP', 'ASDF'), ('MUL', 'ASDFASDFASDF')]

In [4]:
"adsf".upper()

'ADSF'

In [16]:
"%True%list_to_set% foo=234".split("%")

['', '+', 'list_to_set', ' foo=234']

In [None]:
cond = "list_to_set in DRV"

In [None]:
"%list_to_set:".partition

In [13]:
str.partition?

In [17]:
s = """
nc: http://release.niem.gov/niem/niem-core/4.0/#
j: http://release.niem.gov/niem/domains/jxdm/6.0/#
age: nc:PersonAgeMeasure
value: nc:MeasureIntegerValue
units: nc:TimeUnitCode
hairColor: j:PersonHairColorCode
name: nc:PersonName
given: nc:PersonGivenName
surname: PersonSurName
suffix: PersonNameSuffixText
nickname: PersonPreferredName"""

In [25]:
from urllib.parse import urlparse

d = {}

for l in s.splitlines():
    def expand(v):
        head, _, tail = v.partition(":")
        if _:
            url = urlparse(v)
            if not url.scheme or not url.netloc:
                try:
                    v = d[head] + tail
                except:
                    pass
        return v
        
    k, p, v = (p.strip() for p in l.partition(":"))
    v = expand(v)
    if p:
        d[k]= v

ParseResult(scheme='http', netloc='release.niem.gov', path='/niem/niem-core/4.0/', params='', query='', fragment='')
ParseResult(scheme='http', netloc='release.niem.gov', path='/niem/domains/jxdm/6.0/', params='', query='', fragment='')
ParseResult(scheme='nc', netloc='', path='PersonAgeMeasure', params='', query='', fragment='')
nc PersonAgeMeasure
ParseResult(scheme='nc', netloc='', path='MeasureIntegerValue', params='', query='', fragment='')
nc MeasureIntegerValue
ParseResult(scheme='nc', netloc='', path='TimeUnitCode', params='', query='', fragment='')
nc TimeUnitCode
ParseResult(scheme='j', netloc='', path='PersonHairColorCode', params='', query='', fragment='')
j PersonHairColorCode
ParseResult(scheme='nc', netloc='', path='PersonName', params='', query='', fragment='')
nc PersonName
ParseResult(scheme='nc', netloc='', path='PersonGivenName', params='', query='', fragment='')
nc PersonGivenName


In [26]:
d

{'nc': 'http://release.niem.gov/niem/niem-core/4.0/#',
 'j': 'http://release.niem.gov/niem/domains/jxdm/6.0/#',
 'age': 'http://release.niem.gov/niem/niem-core/4.0/#PersonAgeMeasure',
 'value': 'http://release.niem.gov/niem/niem-core/4.0/#MeasureIntegerValue',
 'units': 'http://release.niem.gov/niem/niem-core/4.0/#TimeUnitCode',
 'hairColor': 'http://release.niem.gov/niem/domains/jxdm/6.0/#PersonHairColorCode',
 'name': 'http://release.niem.gov/niem/niem-core/4.0/#PersonName',
 'given': 'http://release.niem.gov/niem/niem-core/4.0/#PersonGivenName',
 'surname': 'PersonSurName',
 'suffix': 'PersonNameSuffixText',
 'nickname': 'PersonPreferredName'}