# 状态模式

设计行为状态而变的对象

In [4]:
import collections
import random

random.seed(917) # Not truly random for ease of regression testing


def main():
    totalCounter = Counter()
    carCounter = Counter("cars")
    commercialCounter = Counter("vans", "trucks")

    multiplexer = Multiplexer()
    for eventName, callback in (("cars", carCounter),
            ("vans", commercialCounter), ("trucks", commercialCounter)):
        multiplexer.connect(eventName, callback)
        multiplexer.connect(eventName, totalCounter)

    for event in generate_random_events(100):
        multiplexer.send(event)
    print("After 100 active events:  cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))

    multiplexer.state = Multiplexer.DORMANT
    for event in generate_random_events(100):
        multiplexer.send(event)
    print("After 100 dormant events: cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))
    
    multiplexer.state = Multiplexer.ACTIVE
    for event in generate_random_events(100):
        multiplexer.send(event)
    print("After 100 active events:  cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))
    

def generate_random_events(count):
    vehicles = (("cars",) * 11) + (("vans",) * 3) + ("trucks",)
    for _ in range(count):
        yield Event(random.choice(vehicles), random.randint(1, 3))



class Counter:

    def __init__(self, *names):
        self.anonymous = not bool(names)
        if self.anonymous:
            self.count = 0
        else:
            for name in names:
                if not name.isidentifier():
                    raise ValueError("names must be valid identifiers")
                setattr(self, name, 0)


    def __call__(self, event):
        if self.anonymous:
            self.count += event.count
        else:
            count = getattr(self, event.name)
            setattr(self, event.name, count + event.count)


class Event:

    def __init__(self, name, count=1):
        if not name.isidentifier():
            raise ValueError("names must be valid identifiers")
        self.name = name
        self.count = count


class Multiplexer:

    ACTIVE, DORMANT = ("ACTIVE", "DORMANT")

    def __init__(self):
        self.callbacksForEvent = collections.defaultdict(list)
        self.state = Multiplexer.ACTIVE


    def connect(self, eventName, callback):
        if self.state == Multiplexer.ACTIVE:
            self.callbacksForEvent[eventName].append(callback)


    def disconnect(self, eventName, callback=None):
        if self.state == Multiplexer.ACTIVE:
            if callback is None:
                del self.callbacksForEvent[eventName]
            else:
                self.callbacksForEvent[eventName].remove(callback)


    def send(self, event):
        if self.state == Multiplexer.ACTIVE:
            for callback in self.callbacksForEvent.get(event.name, ()):
                callback(event)


if __name__ == "__main__":
    main()


After 100 active events:  cars=150 vans=42 trucks=14 total=206
After 100 dormant events: cars=150 vans=42 trucks=14 total=206
After 100 active events:  cars=303 vans=83 trucks=30 total=416


In [5]:
import collections
import random

random.seed(917) # Not truly random for ease of regression testing


def main():
    totalCounter = Counter()
    carCounter = Counter("cars")
    commercialCounter = Counter("trucks", "vans")

    multiplexer = Multiplexer()
    for eventName, callback in (("cars", carCounter),
            ("vans", commercialCounter), ("trucks", commercialCounter)):
        multiplexer.connect(eventName, callback)
        multiplexer.connect(eventName, totalCounter)

    for event in generate_random_events(100):
        multiplexer.send(event)
    print("After 100 active events:  cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))

    multiplexer.state = Multiplexer.DORMANT
    for event in generate_random_events(100):
        multiplexer.send(event)
    print("After 100 dormant events: cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))
    
    multiplexer.state = Multiplexer.ACTIVE
    for event in generate_random_events(100):
        multiplexer.send(event)
    print("After 100 active events:  cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))
    

def generate_random_events(count):
    vehicles = (("cars",) * 11) + (("vans",) * 3) + ("trucks",)
    for _ in range(count):
        yield Event(random.choice(vehicles), random.randint(1, 3))


class Counter:

    def __init__(self, *names):
        self.anonymous = not bool(names)
        if self.anonymous:
            self.count = 0
        else:
            for name in names:
                if not name.isidentifier():
                    raise ValueError("names must be valid identifiers")
                setattr(self, name, 0)


    def __call__(self, event):
        if self.anonymous:
            self.count += event.count
        else:
            count = getattr(self, event.name)
            setattr(self, event.name, count + event.count)


class Event:

    def __init__(self, name, count=1):
        if not name.isidentifier():
            raise ValueError("names must be valid identifiers")
        self.name = name
        self.count = count


class Multiplexer:

    ACTIVE, DORMANT = ("ACTIVE", "DORMANT")

    def __init__(self):
        self.callbacksForEvent = collections.defaultdict(list)
        self.state = Multiplexer.ACTIVE


    @property
    def state(self):
        return (Multiplexer.ACTIVE if self.send == self.__active_send
                else Multiplexer.DORMANT)


    @state.setter
    def state(self, state):
        if state == Multiplexer.ACTIVE:
            self.connect = self.__active_connect
            self.disconnect = self.__active_disconnect
            self.send = self.__active_send
        else:
            self.connect = lambda *args: None
            self.disconnect = lambda *args: None
            self.send = lambda *args: None


    def __active_connect(self, eventName, callback):
        self.callbacksForEvent[eventName].append(callback)


    def __active_disconnect(self, eventName, callback=None):
        if callback is None:
            del self.callbacksForEvent[eventName]
        else:
            self.callbacksForEvent[eventName].remove(callback)


    def __active_send(self, event):
        for callback in self.callbacksForEvent.get(event.name, ()):
            callback(event)


if __name__ == "__main__":
    main()


After 100 active events:  cars=150 vans=42 trucks=14 total=206
After 100 dormant events: cars=150 vans=42 trucks=14 total=206
After 100 active events:  cars=303 vans=83 trucks=30 total=416


In [7]:
import collections
import random
from Qtrac import coroutine

random.seed(917) # Not truly random for ease of regression testing


def main():
    totalCounter = Counter()
    carCounter = Counter("cars")
    commercialCounter = Counter("trucks", "vans")

    multiplexer = Multiplexer()
    pipeline = multiplexer.pipeline()
    for eventName, callback in (("cars", carCounter),
            ("vans", commercialCounter), ("trucks", commercialCounter)):
        multiplexer.connect(eventName, callback)
        multiplexer.connect(eventName, totalCounter)

    for event in generate_random_events(100):
        pipeline.send(event)
    print("After 100 active events:  cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))

    multiplexer.state = Multiplexer.DORMANT
    for event in generate_random_events(100):
        pipeline.send(event)
    print("After 100 dormant events: cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))
    
    multiplexer.state = Multiplexer.ACTIVE
    for event in generate_random_events(100):
        pipeline.send(event)
    print("After 100 active events:  cars={} vans={} trucks={} total={}"
            .format(carCounter.cars, commercialCounter.vans,
                    commercialCounter.trucks, totalCounter.count))
    

def generate_random_events(count):
    vehicles = (("cars",) * 11) + (("vans",) * 3) + ("trucks",)
    for _ in range(count):
        yield Event(random.choice(vehicles), random.randint(1, 3))


class Counter:

    def __init__(self, *names):
        self.anonymous = not bool(names)
        if self.anonymous:
            self.count = 0
        else:
            for name in names:
                if not name.isidentifier():
                    raise ValueError("names must be valid identifiers")
                setattr(self, name, 0)


    def __call__(self, event):
        if self.anonymous:
            self.count += event.count
        else:
            count = getattr(self, event.name)
            setattr(self, event.name, count + event.count)


class Event:

    def __init__(self, name, count=1):
        if not name.isidentifier():
            raise ValueError("names must be valid identifiers")
        self.name = name
        self.count = count


class Multiplexer:

    ACTIVE, DORMANT = ("ACTIVE", "DORMANT")

    def __init__(self):
        self.callbacksForEvent = collections.defaultdict(list)
        self.state = Multiplexer.ACTIVE


    @coroutine
    def pipeline(self):
        while True:
            event = (yield)
            if self.state == Multiplexer.ACTIVE:
                for callback in self.callbacksForEvent.get(event.name, ()):
                    callback(event)


    def connect(self, eventName, callback):
        if self.state == Multiplexer.ACTIVE:
            self.callbacksForEvent[eventName].append(callback)


    def disconnect(self, eventName, callback=None):
        if self.state == Multiplexer.ACTIVE:
            if callback is None:
                del self.callbacksForEvent[eventName]
            else:
                self.callbacksForEvent[eventName].remove(callback)


if __name__ == "__main__":
    main()


After 100 active events:  cars=150 vans=42 trucks=14 total=206
After 100 dormant events: cars=150 vans=42 trucks=14 total=206
After 100 active events:  cars=303 vans=83 trucks=30 total=416
