# How generators and contexts work

In [1]:
class ContextGen():
    def __init__(self, input_list):
        self._l = input_list
        
    def __enter__(self):
        return iter(self)
    
    def __exit__(self, type, value, traceback):
        print("Running clean-up code")
        
    def __iter__(self):
        yield from self._l

In [2]:
with ContextGen("abcd") as gen:
    for x in gen:
        print(x)

a
b
c
d
Running clean-up code


In [21]:
cg = ContextGen("352")
with cg:
    for x in cg:
        print(x)

3
5
2
Running clean-up code


# Using threads

In [3]:
import threading, queue

In [4]:
class CallbackInterface():
    def notify(self, data):
        pass
    
    def atend(self):
        pass
    
class ExceptionCapturingThread(threading.Thread):
    def __init__(self, func, queue):
        super().__init__()
        self._func = func
        self._queue = queue
        
    def run(self):
        try:
            self._func()
        except Exception as e:
            self._queue.put(e)

class CallbackToQueue(CallbackInterface):
    def __init__(self, func=None):
        self._queue = queue.Queue(maxsize=1)
        self._terminate = False
        if func is not None:
            self.prime(func)
    
    def notify(self, data):
        if self._terminate:
            self._queue.put(StopIteration)
            raise Exception("Forcing early termination")
        self._queue.put(data)

    def atend(self):
        self.notify(StopIteration)
    
    def __enter__(self):
        def ourfunc():
            self._func(self)
        self._thread = ExceptionCapturingThread(ourfunc, self._queue)
        self._thread.start()
        return iter(self)
    
    def __exit__(self, type, value, traceback):
        self._terminate = True
        while self._thread.is_alive():
            try:
                datum = self._queue.get(timeout=0.1)
            except queue.Empty:
                pass
    
    def __iter__(self):
        while True:
            try:
                datum = self._queue.get(timeout=1)
                if datum is StopIteration:
                    break
                if isinstance(datum, Exception):
                    raise datum
                yield datum
            except queue.Empty:
                if not self._thread.is_alive():
                    break

    def prime(self, func):
        self._func = func

This shows how data pushed to a handler can now be processed from a generator

In [5]:
def push_to_callback(handler):
    for x in "1234abcd":
        handler.notify(x)
    handler.atend()

In [6]:
with CallbackToQueue(push_to_callback) as generator:
    for x in generator:
        print(x)

1
2
3
4
a
b
c
d


If the code consuming from the generator ends, then the context manager closes the thread properly

In [7]:
with CallbackToQueue(push_to_callback) as generator:
    for i, x in enumerate(generator):
        print(i, x)
        if i >= 3:
            break

0 1
1 2
2 3
3 4


If the callback code raises an exception, this gets pushed to us from the generator.

In [8]:
def push_to_callback(handler):
    for x in "1234abcd":
        if x == "b":
            raise ValueError()
        handler.notify(x)
    handler.atend()

with CallbackToQueue(push_to_callback) as generator:
    for x in generator:
        print(x)

1
2
3
4
a


ValueError: 

# Metaprogramming

In [13]:
import inspect

In [18]:
# Python 3 needs "isfunction" not "ismethod"
class_name = CallbackInterface

d = dict()
for name, _ in inspect.getmembers(class_name, predicate=inspect.isfunction):
    
    d[name] = 
    
ourclass = type("OurClass", (class_name,), {})


SyntaxError: unexpected EOF while parsing (<ipython-input-18-046dd9ba7fdb>, line 7)

In [20]:
inspect.signature(CallbackInterface.notify)

<Signature (self, data)>