In [2]:
%%writefile fluxx.py

"""
MIT License

Copyright (c) 2017-2018 Anselm Kiefner

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

from warnings import warn
from collections import Counter, defaultdict, OrderedDict
from copy import copy
from uuid import uuid4, UUID
from functools import wraps
from logging import getLogger, INFO, basicConfig
from inspect import isclass

logger = getLogger("fluxx")
logger.setLevel(INFO)

FORMAT = '%(message)s'
basicConfig(format=FORMAT)


class FluxException(RuntimeWarning):
    pass


class TraitBag:
    __slots__ = ["__weakref__", "traits", "bag", "name"]
    
    def __init__(self, name=None, *, 
                 in_traits=(), stereotypes=(), ex_traits=(), bag=()):
        """First set up stereotypes of traits as sets of Enum seperate from the class
        and add all the traits that should be included in a "class" of TraitBags. 
        Pass in those stereotypes and polish the rough edges by taking 
        away the traits that should be excluded and add ones that aren't included in stereotypes.
        
        You really DO NOT want to subclass this. 
        The point of TraitBag is that they only represent state, nothing more.
        Even more so, __slots__ don't play nice with subclassing 
        (a subclass creates its own dict, killing the whole point of __slots__), so, if you NEED
        a TraitBag with methods, use the TraitMixin for instance with StateMachine 
        (the TraitMachine takes a different approach all together).
        
        The bag is used with tokens (another Enum) for most simple use cases.
        If there's a need to extend a "class" of TraitBags, don't do it here.
        Instead, use some "global" WeakKeyDictionary or WeakValueDictionary to manage things.
        Keep in mind that in this case you will probably need one to 
        map from name to thing and then from thing to property.. and be VERY CAREFUL when you
        use more than one strong ref to the thing - otherwise the weakrefs WILL bite your butt!
        
        One more thing: the point of TraitBag.clone() is to be able to prototype.
        You can build one complex prototype of a TraitBag, then replicate 
        the bugger without any of the boilerplate. 
        Careful bout those global dicts though - the clone needs extra love.
        """
        self.name = uuid4() if name is None else name
        self.bag = Counter(bag)
        # unpack the list of stereotypes and sets of traits, yielding all traits
        # but since it's a SET-comprehension, return a set of traits
        self.traits = {t for S in stereotypes for t in S}
        self.traits.update(set(in_traits))
        self.traits.difference_update(set(ex_traits))
        
    def __str__(self):
        return str(self.name) if not isinstance(self.name, UUID) else self.name.hex[:5]
    
    def __eq__(self, other):
        return self.traits == other.traits and self.bag == other.bag
    
    def __repr__(self):
        return f"TraitBag(name={self.name}, in_traits={self.traits}, bag={self.bag})"
    
    def clone(self, name=None):
        c = copy(self)
        c.name = uuid4() if name is None else name
        return c


class StateMachine:
    """Most general freaking FSM.
    
    If you can think of anything that has discrete states, you can probably
    model it with this. You can register callback functions to when the 
    SM changes state. In case you want to have dynamic methods, you need to 
    implement a kind of dispatch mechanism. This is what the accept method is for -
    taking requests and dispatching them to other methods.

    It is also possible to work with State classes instead of Enums,
    but to keep things concise, you should try to work with reg_enter etc. instead
    of implementing enter() methods. Classes have the advantage that you can
    have state-dependent @staticmethods so that you can just call a common interface
    without worrying about dispatching.
    
    enters, exits are dicts of state to function that needs to be called on event.
    transitions is a dict of (state, state) that is called in transition of exactly those two states, 
    thus more specific than enters or exits.
    """
    def __init__(self, *, states:dict, initial, name=None, 
                 enters=None, exits=None, transitions=None,
                 history=None, tracing=False, bag=None):
        self.name = uuid4() if name is None else name
        self.bag = Counter()
        if bag is not None:
            self.bag.update(bag)
        
        self.states = {}
        self.states.update(states)
        self.state = initial
        self.previous = initial
        
        self.transitions = defaultdict(lambda: lambda: None)
        if transitions is not None:
            self.transitions.update(transitions)
        
        self.enters = defaultdict(lambda: lambda: None)
        if enters is not None:
            self.enters.update(enters)
        
        self.exits = defaultdict(lambda: lambda: None)
        if exits is not None:
            self.exits.update(exits)
        
        if tracing:
            self.history = [initial] if history is None else history
        self.tracing = tracing
    
    def accept(self, aim, *args):
        """
        Prototype for a dispatch mechanic. Should be overwritten in subclasses.
        
        In summary:
        Since a caller shouldn't worry (especially if async) about the state we are in,
        they make an educated guess what we can do and make a request in form of passing
        a token Enum with optional args.
        
        We don't expose our functions but instead dispatch the token here - see if we are in a state
        that can accept the token and if so, to which function it should go.
        
        It is important to maintain a consistent interface. 
        Usually this means only returning True or False, but if we need to chain such requests,
        message passing (for instance a namedtuple or dictionary as result) is recommended.
        
        This might require either passing a callback (cb) and errback (eb) function or 
        the instance of the actor.
        """
        raise NotImplementedError
    
    def flux_to(self, to_state):
        try:
            if to_state in self.states[self.state]:                
                # Exceptions aren't caught here because this might result in undefined behaviour
                # crash early - crash often
                self.exits[self.state]()
                self.transitions[(self.state, to_state)]()
                self.enters[to_state]()

                self.previous = self.state  # often used in user code
                self.state = to_state
                if isclass(to_state):
                    logger.info(f"{self.name} now {to_state.__name__}")
                else:
                    logger.info(f"{self.name} now {to_state}")
                if self.tracing:
                    self.history.append(to_state)
                return True
            else:
                warn(f"no valid transition from {self.state} to {to_state}", UserWarning)
                return False
        except KeyError:
            warn(f"{self.state} is final", UserWarning)
            return False
    
    @property
    def next_possible_states(self):
        try:
            return self.states[self.state]
        except KeyError:
            return set()
    
    def reg_enter(self, *args):
        """Register a function that is called when a state is entered.
        Can be used with a lambda or as decorator.
        like
        >>> sm = StateMachine(...)
        >>> @sm.reg_enter(S.foo)
        >>> def do_stuff_on_entry(self, e):
            ...
        or
        >>> sm.reg_enter(S.foo, lambda e: ...)
        """
        if len(args) == 2:
            state, func = args
            self.enters[state] = func
        if len(args) == 1:
            state = args[0]
            def decorator(func):
                self.enters[state] = func
            return decorator
    
    def reg_exit(self, *args):
        """Register a function that is called when a state is exited.
        Can be used with a lambda or as decorator.
        """
        if len(args) == 2:
            state, func = args
            self.exits[state] = func
        if len(args) == 1:
            state = args[0]
            def decorator(func):
                self.exits[state] = func
            return decorator
    
    def reg_transit(self, *args):
        """Register a function that is called during transit from one specific state to another.
        Can be used with a lambda or as decorator.
        """
        print(args, len(args))
        if len(args) == 3:
            self.transitions[(args[0], args[1])] = args[2]
        if len(args) == 2:
            from_state, to_state = args
            def decorator(func):
                self.transitions[(from_state, to_state)] = func
            return decorator
    
    def clone(self, name=None):
        c = copy(self)
        c.name = uuid4() if name is None else name
        
        c.states = {}
        c.states.update(self.states)
        
        c.transitions = defaultdict(lambda: lambda e: None)
        c.transitions.update(self.transitions)
        
        c.enters = defaultdict(lambda: lambda e: None)
        c.enters.update(self.enters)
        
        c.exits = defaultdict(lambda: lambda e: None)
        c.exits.update(self.exits)
        return c
    
    def __str__(self):
        return str(self.name) if not isinstance(self.name, UUID) else self.name.hex[:5]
    
    def __repr__(self):
        return f"{self.name} in {self.state} with {self.states}"
    
    def __mul__(self, number):
        for x in range(number):
            yield self.clone()
            
    def __call__(self):
        """This makes it possible to easily realize hierarchical statemachines with Collector
        by assuming that each FSM can be called to return its current state and applying
        the composition pattern. 
        This means we have a recursive structure of Collectors and StateMachines
        being the leaves of the call tree.
        """
        return self.state

class Collector(StateMachine):
    """Quite sophisticated Super-StateMachine.
    It can be used with anything that can be called, returning a valid state enum,
    including TraitMachines.

    Conditions are tricky. It's an ordered dictionary of frozensets of tuples 
    (machine, state) which points to a state. It's more convenient to just use
    c = Collector(...)
    m = StateMachine(...)
    c[(m, s1)] = s2

    Much easier that way and less error prone. The conditions are evaluated in 
    reversed order, so if you insist on some priority, 
    just make sure to add the more specific condition after the more general.
    """

    def __init__(self, *, states:dict, conditions=None, initial, name=None, default=None, **kwargs):
        super().__init__(states=states, initial=initial, name=name, **kwargs)
        
        self.conditions = OrderedDict()
        if conditions is not None:
            self.conditions.update(conditions)
        
        self.default = initial if default is None else default
    
    def __setitem__(self, key, value):
        try:
            m, s = key
            if isinstance(m, StateMachine):
                key = [key]
        except ValueError:
            pass
        self.conditions[frozenset(key)] = value
        self.recursion_check(self.conditions.keys())
    
    def recursion_check(self, conditions):
        # conditions = [frozenset((fsm, state),)]
        for c in conditions:
            for sm, s in c:
                if sm is self:
                    raise FluxException("Collector contains itself by recursion!")
                else:
                    if isinstance(sm, Collector):
                        self.recursion_check(sm.conditions.keys())
               

    def __delitem__(self, key):
        if isinstance(key, tuple):
            key = [key]       
        del self.conditions[frozenset(key)]
    
    def __call__(self):
        # since self.conditions is ordered, execution is not arbitrary
        for c, t_state in reversed(self.conditions.items()):
            if all(sm() is s for sm, s in c):
                self.flux_to(t_state)
                break
        # else is only executed if loop is completed
        else:
            self.flux_to(self.default)
        return self.state
                        
    def clone(self, name=None):
        # better alternative would be to implement __repr__ correctly,
        # then instantiate a new machine from that repr. 
        # However, that would require pickling recursive functions, which is non-trivial.
        
        new = copy(self)
        new.name = uuid4() if name is None else name
        
        new.states = {}
        new.states.update(self.states)
        
        new.transitions = defaultdict(lambda: lambda e: None)
        new.transitions.update(self.transitions)
        
        new.enters = defaultdict(lambda: lambda e: None)
        new.enters.update(self.enters)
        
        new.exits = defaultdict(lambda: lambda e: None)
        new.exits.update(self.exits)
        
        new.conditions = {}
        new.conditions.update(self.conditions)
        return new


class TraitMixin:
    """Use this in connection with another class get the benefit of a TraitBag
    without strings attached.
    
    >>> class Door(TraitMixin, StateMachine):
    >>>    pass
    """
    def __init__(self, *, in_traits=(), stereotypes=(), ex_traits=(), **kwargs):
        super().__init__(**kwargs)
        # unpack the list of stereotypes and sets of traits, yielding all traits
        # but since it's a SET-comprehension, return a set of traits
        self.traits = {t for S in stereotypes for t in S}
        self.traits.update(set(in_traits))
        self.traits.difference_update(set(ex_traits))


class TraitMachine(StateMachine):
    """The state machine paradigm taken the other way round.
    
    A statemachine whose state depends on its traits, compatible to Collector.
    You add or remove traits, which may flux its state when evaluated.
    
    This is particularly useful if you have a lot of moving parts and the luxury to postpone evaluation for a while
    like in an event loop.
    """
    def __init__(self, *, initial, default=None,
                 stereotypes=None, in_traits=None, ex_traits=None, 
                 trait_states=None, **kwargs):
        super().__init__(initial=initial, **kwargs)
        
        self.default = default
        
        stereotypes = [] if stereotypes is None else stereotypes
        in_traits = set() if in_traits is None else in_traits
        ex_traits = set() if ex_traits is None else ex_traits
        
        self.traits = {t for S in stereotypes for t in S}
        self.traits.update(set(in_traits))
        self.traits.difference_update(set(ex_traits))
        
        self.trait_states = {} if trait_states is None else trait_states
   
    def __call__(self):
        try:
            to_state = self.trait_states[frozenset(self.traits)]
            if to_state is not self.state:
                self.flux_to(to_state)
        except KeyError:
            if self.default is not None:
                self.flux_to(self.default)
        return self.state
        
    
class TraitStateMachine(StateMachine):
    """Pretty badass alternative to local dispatching.
    
    We define by set operations over enums which methods should be available
    for any given state. It doesn't matter where the methods are defined 
    (as long as it's not the same class, that would make things messy again) and we don't care
    about special dispatch logic because we decorate the functions with @can() to specify
    which Trait a thing should have to make the function applicable.
    This kills two birds with one stone: All the important logic is in one place - 
    the instantiation call for the TraitMachine - and functions can be implemented anywhere,
    as long as they adhere to the convention with can()-decorator and taking thing as main arg.
    """
    
    def __init__(self, *, initial,
                 stereotypes=None, in_traits=None, ex_traits=None, 
                 stateful_traits=None, **kwargs):
        super().__init__(initial=initial, **kwargs)
               
        stereotypes = [] if stereotypes is None else stereotypes
        in_traits = set() if in_traits is None else in_traits
        ex_traits = set() if ex_traits is None else ex_traits
        
        self.stateless_traits = {t for S in stereotypes for t in S}
        self.stateless_traits.update(set(in_traits))
        self.stateless_traits.difference_update(set(ex_traits))
        
        self.stateful_traits = defaultdict(lambda: set())
        if stateful_traits is not None:
            self.stateful_traits.update(stateful_traits)
         
        self.traits = self.stateless_traits | self.stateful_traits[self.state]      

    def __call__(self):
        self.traits = self.stateless_traits | self.stateful_traits[self.state]
        return self.state
        
    def flux_to(self, to_state):
        w = super().flux_to(to_state)
        self.traits = self.stateless_traits | self.stateful_traits[self.state]
        return w
        
def can(in_trait, ex_trait=None, observed=None):
    """Decorator for "case-dispatch" via logical set operations on traits 
    (which can be any kind of singleton like enums).
    
    If something should be able to call something, you give it a token aka trait.
    
    In case you want to check for any number of traits, this decorator 
    could easily be modified to take sets.
    """
    def wrapper(func):
        # Delegation of different wrappers for methods and standalone functions under the same umbrella. 
        # Also faster to do the check first and delegate to the correct function based on case than to check the case every call during runtime.
        
        # first we check if we've been given a thing to observe initially
        if observed:
            @wraps(func)
            def calling(*args, **kwargs):
                if in_trait in observed.traits and not ex_trait in observed.traits:
                    func(*args, **kwargs)
                else:
                    try:
                        warn(f"{func.__qualname__} isn't available while {observed.name} {observed.state} (called with {args} {kwargs})")
                    except AttributeError:
                        warn(f"{func.__qualname__} can't be called, {observed} has no {in_trait}")
        # next we might have a class method
        elif len(func.__qualname__.split('.')) > 1:
            @wraps(func)
            def calling(*args, **kwargs):
                self, thing, *_ = args
                if in_trait in thing.traits and not ex_trait in thing.traits:
                    func(*args, **kwargs)
                else:
                    try:
                        warn(f"{func.__qualname__} isn't available while {thing.name} {thing.state} (called with {args} {kwargs})")
                    except AttributeError:
                        warn(f"{func.__qualname__} can't be called, {thing} has no {in_trait}")
        # leaves us with a module-level function
        else:
            @wraps(func)
            def calling(*args, **kwargs):
                thing, *_ = args
                if in_trait in thing.traits and not ex_trait in thing.traits:
                    func(*args, **kwargs)
                else:
                    try:
                        warn(f"{func.__qualname__} isn't available while {thing.name} in {thing.state} (called with {args} {kwargs})")
                    except AttributeError:
                        warn(f"{func.__qualname__} can't be called, {thing} has no {in_trait}")
        return calling
    return wrapper

Overwriting fluxx.py


In [3]:
%load_ext autoreload

In [5]:
%autoreload 2

from fluxx import TraitBag
from collections import ChainMap
from enum import Enum

class req(Enum):
    GET = 1  # a representation of the target resource
    POST = 2  # create a resource or do something out of the box
    PUT = 3  # create or update a resource so that it is as specified
    PATCH = 4  # change part of a resource
    OPTIONS = 5  # request a list of allowed methods
    DELETE = 6  # remove a resource
    HEAD = 7  # request metadata
    CONNECT = 8  # try to connect via a proxy
    TRACE = 9  # a representation of the request message as received by the end server.
    
    def __call__(self, **data):
        data["VERB"] = self
        return data
    
class rep(Enum):
    OK = 200
    BadRequest = 400
    Forbidden = 403
    NotFound = 404
    MethodNotAllowed = 405
    Conflict = 409
    InternalServerError = 500

CAN = Enum("CAN", "walk talk be_drunk sleep")      

def walk(msg):
    print("walking..", msg["dist"])
    return msg
    
def walk_drunk(msg):
    print("walking drunk..", msg["dist"]/2)
    return msg

def talk(msg):
    print("bla")
    return msg

def talk_drunk(msg):
    print("talking crap..")
    return msg

def hic(msg):
    msg["dist"] -= 1
    print("hics!...")
    return msg

def sleep(msg):
    print("zzzzz...")
    return msg

class Test(TraitBag):
    def __init__(self, **args):
        self.methods = {CAN.walk: walk, 
                   CAN.talk: talk,
                  CAN.sleep: sleep}
        super().__init__(**args)
    
    def do(self, verb, msg):
        if CAN.be_drunk in self.traits:
            self.methods = ChainMap({CAN.walk: lambda req: hic(walk_drunk(req)),
                                    CAN.talk: talk_drunk,
                                     CAN.be_drunk: hic}, 
                                    self.methods)        
        if verb is req.OPTIONS:
            return self.methods
        
        func = self.methods[msg["act"]]
        return func(msg)
    
b = Test(in_traits={CAN.walk, CAN.talk, CAN.be_drunk})
b.do(req.POST(act=CAN.walk, dist=5))
b.do(req.POST(act=CAN.sleep))
b.traits -= {CAN.be_drunk}
b.do(req.POST(act=CAN.talk))

TypeError: do() missing 1 required positional argument: 'msg'

In [None]:
from collections import Counter

c = Counter()
c["a"] = 2
c["b"] = 3

max(c.items(), key=lambda i: i[1])

In [None]:
n = 1005

sum(int(x) for x in str(n))

In [None]:
from enum import Enum
from collections import Counter
from flux import StateMachine
from uuid import uuid4


class StateCollector:    
    def __init__(self, *, states:dict=None, initial, name=None, dominating=None):
        self.name = uuid4() if name is None else name
        self.states = {} if states is None else states
        self.state = initial
        self.votes = Counter()
        self.dominating = set() if dominate is None else set(dominating)
    
    def __setitem__(self, key, value):
        if isinstance(key, tuple):
            self.states[frozenset([key])] = value
        else:
            self.states[frozenset(key)] = value

    def __delitem__(self, key):
        if isinstance(key, tuple):
            del self.states[frozenset([key])]
        else:
            del self.states[frozenset(key)]
    
    def __call__(self):
        for c, t_state in self.states.items():
            try:
                if all(sm.state is s for sm, s in c):
                    self.votes[t_state] += 1
            except TypeError:
                sm, s = c
                if sm.state is s:
                    self.votes[t_state] += 1
        self.state = max(self.votes.items(), key=lambda i: i[1])[0]
        print(self.votes)
        return self.state
    
    def clone(self, name=None):
        c = copy(self)
        c.name = uuid4() if name is None else name
        return c
    
    def flux_to(self, to_state):
        try:
            if to_state in self.states[self.state]:
                self.state = to_state
                logger.info(f"{self.name} is now {self.state}")
                return True
            else:
                warn(f"no valid transition from {self.state} to {to_state}", UserWarning)
                return False
        except KeyError:
            warn(f"no transitions defined for {self.state}", UserWarning)
            return False

IS = Enum("State", "ON OFF")

s1 = StateMachine(states={IS.ON:{IS.ON, IS.OFF},
                          IS.OFF:{IS.OFF, IS.ON}}, initial=IS.OFF)
s2, s3 = s1*2

light = StateCollector(initial=IS.OFF)
light[[(s1, IS.ON), (s2, IS.ON), (s3, IS.ON)]] = IS.ON

light[(s1, IS.ON)] = IS.OFF


s1.flux_to(IS.ON)
s2.flux_to(IS.ON)
s3.flux_to(IS.ON)


light()

In [None]:
mouse appears
mouse runs away
mouse appears
mouse enters trap
mouse escapes
mouse appears
mouse enters trap
mouse trapped
mouse removed
mouse appears
mouse runs away
mouse appears
mouse enters trap
mouse trapped
mouse removed

In [None]:
from flux import StateMachine
from enum import Enum
from logging import getLogger, INFO, WARNING, basicConfig

logger = getLogger("flux")
logger.setLevel(WARNING)

act = Enum("Action", "init appears runs enters_trap escapes trapped dies")

mouse = StateMachine(states={
                            act.init: {act.appears},
                            act.appears:{act.runs, act.enters_trap}, 
                             act.enters_trap:{act.trapped, act.escapes},
                            act.trapped:{act.escapes, act.dies},
                            act.escapes:{act.appears, act.enters_trap, act.runs},
                            act.runs:{act.appears, act.runs, act.enters_trap}
                            }, 
                     initial=act.init, 
                     name="mouse",
                     tracing=True
                    )

mouse.register(act.appears, lambda e: print("mouse appears.."))
mouse.register(act.enters_trap, lambda e: lambda e: print("trap hits!"))
mouse.register(act.trapped, lambda e: print("mouse is trapped :("))
mouse.register(act.escapes, lambda e: print("mouse was able to escape!"))
mouse.register(act.runs, lambda e: print("mouse runs away!"))
mouse.register(act.dies, lambda e: print("mouse dies a horrible death in the trap :<"))

mouse.flux_to(act.appears)
mouse.flux_to(act.runs)
mouse.flux_to(act.enters_trap)
mouse.flux_to(act.escapes)
mouse.flux_to(act.appears)
mouse.flux_to(act.enters_trap)
mouse.flux_to(act.trapped)
mouse.flux_to(act.escapes)
mouse.flux_to(act.runs)
mouse.flux_to(act.appears)
mouse.flux_to(act.enters_trap)
mouse.flux_to(act.trapped)
mouse.flux_to(act.dies)

a = mouse.history

In [None]:
from flux import StateMachine
from enum import Enum
from logging import getLogger, INFO, WARNING, basicConfig

logger = getLogger("flux")
logger.setLevel(WARNING)

#act = Enum("Action", "init appears runs enters_trap escapes trapped dies")

mouse = StateMachine(states={
                            act.init: {act.appears},
                            act.appears:{act.runs, act.enters_trap}, 
                             act.enters_trap:{act.trapped, act.escapes},
                            act.trapped:{act.escapes, act.dies},
                            act.escapes:{act.appears, act.enters_trap, act.runs},
                            act.runs:{act.appears, act.runs, act.enters_trap}
                            }, 
                     initial=act.init, 
                     name="mouse",
                    triggers={act.appears: lambda e: print("mouse appears.."),
                              act.enters_trap: lambda e: print("trap hits!"),
                              act.trapped: lambda e: print("mouse is trapped :("),
                              act.escapes: lambda e: print("mouse was able to escape!"),
                              act.runs: lambda e: print("mouse runs away!"),
                              act.dies: lambda e: print("mouse dies a horrible death in the trap :<")},
                     tracing=True
                    )

mouse.flux_to(act.appears)
mouse.flux_to(act.runs)
mouse.flux_to(act.enters_trap)
mouse.flux_to(act.escapes)
mouse.flux_to(act.appears)
mouse.flux_to(act.enters_trap)
mouse.flux_to(act.trapped)
mouse.flux_to(act.escapes)
mouse.flux_to(act.runs)
mouse.flux_to(act.appears)
mouse.flux_to(act.enters_trap)
mouse.flux_to(act.trapped)
mouse.flux_to(act.dies)

b = mouse.history

a == b

In [None]:
from flux import StateMachine
from enum import Enum
from bridge import cic

ES = Enum("ENetState", "connecting authenticating reconnecting connected disconnected")

class EProtocol(AMP):
    def __init__(self):
        self.nonce = None
        self.cnonce = None
        self.server_status = None
        #keep updating the status every 3 sec
        self.server_status_checker = None
        self.motd = None
        self.news = None
        self.url = None
        
class EFactory(ReconnectingClientFactory):
    """Factory for transport protocols to the entrance server."""
    
    maxDelay = 60
    protocol = EProtocol

    def __init__(self):
        self.state = StateMachine(
            {CState.connecting: {CState.authenticating, CState.connecting},
                CState.authenticating: {CState.connecting, CState.disconnected},
                CState.connected: {CState.reconnecting, CState.disconnected},
                CState.reconnecting: {CState.connected, CState.disconnected}},
            initial=CState.connecting,
                                )
        self.connection = None
        self.ticket = None
        self.eservername = None

    def clientConnectionLost(self, connector, reason):
        logger.error(reason)
        cic.

        if self.connection is None:
            return

        if self.connection.server_status_checker is not None:
            if self.connection.server_status_checker.running:
                self.gui.present(ES_Status("down"))
        self.connection = None
        self.retry()

    def disconnect(self):
        """Disconnect from the entrance server."""
        self.stopTrying()
        if self.connection is not None:
            if self.connection.transport is not None:
                self.connection.transport.loseConnection()
                self.connection = None

    def out_make_account(self, username, password, emailaddr):
        """Make the account creation request."""
        def cb(reply):
            """Called on successful account creation."""
            def redirect_to_login(task):
                """Redirect to login page and preset user/password."""
                cic.game.flux_to(GS.login)
                cic.gui.login.element("txtLogin").setText(username)
                cic.gui.login.element("txtPassword").setText(password)
                return task.done
                     
            self.gui.send(Info("Account created. Redirecting in 5 seconds..."))
            self.bridge.taskMgr.doMethodLater(5.0, redirect_to_login, 
                                              "redirect_to_login")
           
        def eb(error):
            """Called on error while account creation."""
            err = error.trap(KeyError, RuntimeWarning)
            logger.error(error)
            if err == KeyError:
                self.gui.send(Error("Account already exists."))
            if err == RuntimeWarning:
                self.gui.send(Error("Server hickup. Try later."))
            
        if self.connection is None:
            self.gui.send(Error("No connection."))
        else:
            if self.bridge.eclient.authmode == 1:  # Should be AUTHMODE_DIGEST
                pwdhash = sha1(password).hexdigest()
                self.connection.callRemote(net.U2ES_AccountCreationDigest,
                                           username=username,
                                           pwdhash=pwdhash,
                                           emailaddr=emailaddr,
                                           ).addCallbacks(cb, eb)

    def out_login_digest(self, username, pwdhash):
        """Send the login request to ES."""
        if self.connection is None:
            self.gui.send(Error("No connection."))
            return

        # http://en.wikipedia.org/wiki/Cryptographic_nonce
        nonce = self.connection.nonce
        cnonce = self.connection.cnonce

        loginhash = sha1(nonce + cnonce + pwdhash).hexdigest()

        def cb(reply):
            """Called on completing authentication."""
            if self.bridge.conf.saving_username:
                # FIXME: there is something strange going on why = doesn't work
                self.bridge.conf.set_username(username)
            if self.bridge.conf.saving_password:
                self.bridge.conf.set_password(pwdhash)
            
            self.ticket = sha1(username + pwdhash + str(reply["salt"])
                                                                ).hexdigest()
            logger.info("Authentication complete.")
            cs_host, cs_port = reply["cs_host"], reply["cs_port"]
            ws_host, ws_port = reply["ws_host"], reply["ws_port"]
            print "CS:", cs_host, cs_port, "WS:", ws_host, ws_port

            self.finish(cs_host, cs_port, ws_host, ws_port)

        def eb(error):
            """Called if authentication failed."""
            err = error.trap(KeyError, RuntimeWarning)

            if err == KeyError:
                self.gui.send(Error("Login incorrect or no such account."))

            if err == RuntimeWarning:
                self.gui.send(Error("Server problems. Please retry later."))

        self.connection.callRemote(net.U2ES_LoginDigest,
                        username=username,
                        cnonce=cnonce,
                        loginhash=loginhash).addCallbacks(cb, eb)
        
    def finish(self, cs_host, cs_port, ws_host, ws_port):
        """Finish the login stage."""
        # Precaution to avoid sending the ticket faster to wserver than 
        # eserver can
        sleep(1)

        if self.connection.server_status_checker is not None:
            if self.connection.server_status_checker.running:
                self.connection.server_status_checker.stop()

        self.bridge.spawn_clients(ws_host, ws_port, cs_host, cs_port)
        
        self.resetDelay()
        self.stopTrying()
        cic.g.request_switch(Playing)

        self.disconnect()

In [None]:
class EFactory(ReconnectingClientFactory):
    """Factory for transport protocols to the entrance server."""
    
    maxDelay = 60
    protocol = EProtocol

    def __init__(self, bridge):
        self.bridge = bridge
        
        self.connection = None
        self.ready4login = False
        self.ticket = None
        self.eservername = None
    
    @property
    def gui(self):
        return self.bridge.gui

    def clientConnectionLost(self, connector, reason):
        logger.error(reason)
        self.ready4login = False

        if self.connection is None:
            return

        if self.connection.server_status_checker is not None:
            if self.connection.server_status_checker.running:
                self.gui.present(ES_Status("down"))
        self.connection = None
        self.retry()

    def disconnect(self):
        """Disconnect from the entrance server."""
        self.stopTrying()
        if self.connection is not None:
            if self.connection.transport is not None:
                self.connection.transport.loseConnection()
                self.connection = None

    def out_make_account(self, username, password, emailaddr):
        """Make the account creation request."""
        def cb(reply):
            """Called on successful account creation."""
            
            def redirect_to_login(task):
                """Redirect to login page and preset user/password."""
                self.bridge.statemngr.request_switch(Login)
                self.bridge.gui.login.element("txtLogin").setText(username)
                self.bridge.gui.login.element("txtPassword").setText(password)
                
                return task.done
                     
            self.gui.send(Info("Account created. Redirecting in 5 seconds..."))
            self.bridge.taskMgr.doMethodLater(5.0, redirect_to_login, 
                                              "redirect_to_login")
           
        def eb(error):
            """Called on error while account creation."""
            err = error.trap(KeyError, RuntimeWarning)
            logger.error(error)
            if err == KeyError:
                self.gui.send(Error("Account already exists."))
            if err == RuntimeWarning:
                self.gui.send(Error("Server hickup. Try later."))
            
        if self.connection is None:
            self.gui.send(Error("No connection."))
        else:
            if self.bridge.eclient.authmode == 1:  # Should be AUTHMODE_DIGEST
                pwdhash = sha1(password).hexdigest()
                self.connection.callRemote(net.U2ES_AccountCreationDigest,
                                           username=username,
                                           pwdhash=pwdhash,
                                           emailaddr=emailaddr,
                                           ).addCallbacks(cb, eb)

    def out_login_digest(self, username, pwdhash):
        """Send the login request to ES."""
        if self.connection is None:
            self.gui.send(Error("No connection."))
            return

        # http://en.wikipedia.org/wiki/Cryptographic_nonce
        nonce = self.connection.nonce
        cnonce = self.connection.cnonce

        loginhash = sha1(nonce + cnonce + pwdhash).hexdigest()

        def cb(reply):
            """Called on completing authentication."""
            if self.bridge.conf.saving_username:
                # FIXME: there is something strange going on why = doesn't work
                self.bridge.conf.set_username(username)
            if self.bridge.conf.saving_password:
                self.bridge.conf.set_password(pwdhash)
            
            self.ticket = sha1(username + pwdhash + str(reply["salt"])
                                                                ).hexdigest()
            logger.info("Authentication complete.")
            cs_host, cs_port = reply["cs_host"], reply["cs_port"]
            ws_host, ws_port = reply["ws_host"], reply["ws_port"]
            print "CS:", cs_host, cs_port, "WS:", ws_host, ws_port

            self.finish(cs_host, cs_port, ws_host, ws_port)

        def eb(error):
            """Called if authentication failed."""
            err = error.trap(KeyError, RuntimeWarning)

            if err == KeyError:
                self.gui.send(Error("Login incorrect or no such account."))

            if err == RuntimeWarning:
                self.gui.send(Error("Server problems. Please retry later."))

        self.connection.callRemote(net.U2ES_LoginDigest,
                        username=username,
                        cnonce=cnonce,
                        loginhash=loginhash).addCallbacks(cb, eb)
        
    def finish(self, cs_host, cs_port, ws_host, ws_port):
        """Finish the login stage."""
        # Precaution to avoid sending the ticket faster to wserver than 
        #  eserver can
        sleep(1)

        if self.connection.server_status_checker is not None:
            if self.connection.server_status_checker.running:
                self.connection.server_status_checker.stop()

        self.bridge.spawn_clients(ws_host, ws_port, cs_host, cs_port)
        
        self.resetDelay()
        self.stopTrying()
        self.bridge.statemngr.request_switch(Playing)

        self.disconnect()

In [None]:

class EProtocol(AMP):
    """Class interfacing with entrance server."""-

    def connectionMade(self):
        logger.info("")
        self.factory.connection = self

    @net.ES2U_Greeting.responder
    def ES_greeting(self, name, motd, news, url,
                    e_netver, c_netver, w_netver, nonce):
        """Server sends me this on connectionMade."""

        self.factory.eservername = name
        self.nonce = nonce
        self.cnonce = str(time()) + str(randint(0, 65536))
        self.motd = motd
        self.news = news
        self.gui.present(ES_Name(name))
        self.gui.present(ES_MOTD(motd))
        self.gui.present(ES_News(news))
        self.url = url

        if not (E_NETVERSION == e_netver and
            W_NETVERSION == w_netver and
            C_NETVERSION == c_netver):
            self.update(e_netver, w_netver, c_netver)

        def status_cb(reply):
            """Callback on successful request of server status."""
            if reply["status"] == "up":
                self.factory.ready4login = True
            else:
                self.factory.ready4login = False
            self.gui.present(ES_Status(reply["status"]))

        def get_status():
            """Retrieve server status."""
            self.callRemote(net.U2ES_Status).addCallbacks(
                                        status_cb, logger.error)

        self.server_status_checker = LoopingCall(get_status)
        self.server_status_checker.start(3, now=True)

        def cb(reply):
            """Callback on success."""
            logger.info("got testchar prelogin data")

        def eb(error):
            """Callback on Error."""
            err = error.trap(RuntimeWarning)

            if err == RuntimeWarning:
                #call the PreLogin again when status goes up, but only once
                logger.error("can't get prelogin data with server not up.")

        self.callRemote(net.U2ES_PreLogin, 
                        char="testchar").addCallbacks(cb, eb)

        return {}

    @net.S2U_Kick.responder
    def kicked(self, reason):
        """ES kicks me from the system."""
        logger.critical(reason)
        self.factory.bridge.careen()
        return {}

    def update(self, e_ver, w_ver, c_ver):
        """The network protocols are out of sync.

        e_ver -- current netversion for ES protocol
        c_ver -- current netversion for WS protocol
        w_ver -- current netversion for CS protocol
        """
        logger.info("E:{0} W:{1} C:{2}".format(e_ver, w_ver, c_ver))a

In [None]:
from flux import StateMachine
from bridge import cic
from enum import Enum

GS = Enum("GameState", "init loading login signup playing exit")

cic.game = StateMachine({GS.init: {GS.loading},
                            GS.loading: {GS.login, GS.exit},
                            GS.login: {GS.signup, GS.playing, GS.exit},
                            GS.playing: {GS.exit},
                            GS.signup: {GS.login, GS.exit}},
                        initial=GS.init)

@cic.game.reg_enter(GS.loading)
def _(e):
    cic.render.hide()
    cic.render.setBackgroundColor(1, 1, 1)
    cic.render.setFrameRateMeter(False)

@cic.game.reg_enter(GS.login)
def _(e):
    cic.gui.enable()
    cic.gui.System.setGUISheet(self.gui.login.root)
    
cic.game.reg_exit(GS.login, lambda e: cic.gui.System.setGUISheet(None))

cic.game.reg_exit(GS.signup, lambda e: cic.gui.System.setGUISheet(self.gui.signup.root))
cic.game.reg_exit(GS.signup, lambda e: cic.gui.System.setGUISheet(None))

@cic.game.reg_enter(GS.login)
def _(e):
    cic.gui.enable()
    cic.gui.System.setGUISheet(self.gui.login.root)

cic.game.reg_exit(GS.login, lambda e: cic.gui.System.setGUISheet(None))

cic.game.flux_to(GS.loading)

cic.game.reg_enter(GS.signup, lambda e: cic.gui.System.setGUISheet(cic.gui.signup.root))
cic.game.reg_exit(GS.signup, lambda e: cic.gui.System.setGUISheet(None))

@cic.game.reg_enter(GS.playing)
def _(e):
    cic.setBackgroundColor(0.5, 0.5, 0.5)
    cic.render.show()
    cic.setFrameRateMeter(True)
    cic.gui.System.setGUISheet(cic.gui.ingame.root)

cic.game.reg_exit(GS.playing, lambda e: cic.gui.System.setGUISheet(None))        

@cic.game.reg_enter(GS.exit)
def _(e):
    cic.ctrls.ignoreAll()
    cic.statemngr.request_switch(None)
    cic.careen()

In [None]:
from flux import StateMachine
from enum import Enum
from collections import namedtuple

L = Enum("States", "go attention stop prepare dead")
lights = namedtuple("lightcombo", "green yellow red")

post = StateMachine(states={L.go: {L.attention},
                     L.attention: {L.stop},
                     L.stop: {L.prepare},
                     L.prepare: {L.go, L.dead}},
                    initial=L.go,
                    tracing=True)


post.reg_enter(L.go, lambda e: lights(True, False, False))
post.reg_enter(L.attention, lambda e: lights(False, True, False))

post.reg_enter(L.stop, lambda e: lights(False, False, True))
post.reg_exit(L.stop, lambda e: print("enterRed(self, '%s', '%s')" % (e.old, e.new)))

post.reg_enter(L.prepare, lambda e: lights(False, True, True))

post.flux_to(L.attention)
post.flux_to(L.go)
post.flux_to(L.stop)
post.flux_to(L.prepare)
post.flux_to(L.dead)
post.flux_to(L.go)
post.history

In [None]:
class SM:
    def __init__(self):
        self.enters = {}
        
    def reg_enter(self, *args):
        if len(args) == 2:
            state, func = args
            self.enters[state] = func
        if len(args) == 1:
            state = args[0]
            def decorator(func):
                self.enters[state] = func
            return decorator

sm = SM()

@sm.reg_enter("foo")
def _(e):
    print("foo", e)
    
sm.reg_enter("asdf", lambda e: print(e))

sm.enters["asdf"](3)
sm.enters["foo"](5)

In [None]:

from direct.fsm import FSM 
from direct.fsm import State 
class NewStyle(FSM.FSM): 
    def enterRed(self, oldState, newState): 
        print "enterRed(self, '%s', '%s')" % (oldState, newState) 

    def filterRed(self, request, args): 
        print "filterRed(self, '%s', %s)" % (request, args) 
        if request == 'advance': 
            return 'Green' 
        return self.defaultFilter(request, args) 

    def exitRed(self, oldState, newState): 
        print "exitRed(self, '%s', '%s')" % (oldState, newState) 

    def enterYellow(self, oldState, newState): 
        print "enterYellow(self, '%s', '%s')" % (oldState, newState) 

    def filterYellow(self, request, args): 
        print "filterYellow(self, '%s', %s)" % (request, args) 
        if request == 'advance': 
            return 'Red' 
        return self.defaultFilter(request, args) 

    def exitYellow(self, oldState, newState): 
        print "exitYellow(self, '%s', '%s')" % (oldState, newState) 

    def enterGreen(self, oldState, newState): 
        print "enterGreen(self, '%s', '%s')" % (oldState, newState) 

    def filterGreen(self, request, args): 
        print "filterGreen(self, '%s', %s)" % (request, args) 
        if request == 'advance': 
            return 'Yellow' 
        return self.defaultFilter(request, args) 

    def exitGreen(self, oldState, newState): 
        print "exitGreen(self, '%s', '%s')" % (oldState, newState)

In [None]:
from flux import StateMachine, Collector
from enum import Enum

IS = Enum("IS", "on off dead")

s1 = StateMachine(states={IS.off: {IS.on, IS.off},
                         IS.on: {IS.off, IS.on},
                         },
                         initial=IS.off)

s1.reg_exit(IS.off, lambda e: print("not off anymore"))
s1.reg_enter(IS.on, lambda e: print("getting it on"))

@s1.reg_transition(IS.off, IS.on)
def _(e):
    print("not sure yet... oh man, what suspension!")

s1.transitions
s1.flux_to(IS.on)

In [None]:
from flux import StateMachine, Collector
from enum import Enum

IS = Enum("IS", "on off dead")

s1 = StateMachine(states={IS.off: {IS.on, IS.off},
                         IS.on: {IS.off, IS.on},
                         },
                    initial=IS.off,
                    name="s1")

s2 = s1.clone(name="s2")
s3 = s1.clone(name="s3")

light = Collector(states={IS.off: {IS.on, IS.off, IS.dead},
                          IS.on: {IS.off, IS.on, IS.dead},
                         },
                    initial=IS.off,
                    name="light"
                    )

light[[(s1, IS.on), (s2, IS.on), (s3, IS.on)]] = IS.on

light.reg_enter(IS.on, lambda e: print("light's on!"))
light.reg_enter(IS.off, lambda e: print("lights' off!"))
light.reg_enter(IS.dead, lambda e: print("light's gone!"))

light()
s1.flux_to(IS.on)
s2.flux_to(IS.on)
s3.flux_to(IS.on)
light()

light.flux_to(IS.off)
light()
s2.flux_to(IS.off)
light()
light.flux_to(IS.dead)
s2.flux_to(IS.on)
light()