In [1]:
# Move do tmp file
from os import chdir
chdir("/var/folders/qg/pwxx_zfd07x_rpw143q33c840000gn/T/coroutines/")

# Introduction to Generators and Coroutines

<a href="countdown.py">countdown.py</a>.  A trivially simple generator function..

In [2]:
# countdown.py
#
# A simple generator function

def countdown(n):
    print("Counting down from", n)
    while n > 0:
        yield n
        n -= 1
    print("Done counting down")

# Example use
if __name__ == '__main__':
    for i in countdown(10):
        print(i)

Counting down from 10
10
9
8
7
6
5
4
3
2
1
Done counting down


<tt>access-log</tt>).  Leave this program running in the background for the
 next few parts..

In [None]:
# follow.py
#
# A generator that follows a log file like Unix 'tail -f'.
#
# Note: To see this example work, you need to apply to 
# an active server log file.  Run the program "logsim.py"
# in the background to simulate such a file.  This program
# will write entries to a file "access-log".

import time

import time
def follow(thefile):
    thefile.seek(0,2)      # Go to the end of the file
    while True:
         line = thefile.readline()
         if not line:
             time.sleep(0.1)    # Sleep briefly
             continue
         yield line

# Example use
if __name__ == '__main__':
    logfile = open("access-log")
    for line in follow(logfile):
        print(line, end=' ')

<a href="pipeline.py">pipeline.py</a>.  An example of using generators to set up a simple processing pipeline.
 Print all server log entries containing the word 'python'..

In [None]:
# pipeline.py
#
# An example of setting up a processing pipeline with generators

def grep(pattern,lines):
    for line in lines:
        if pattern in line:
             yield line

if __name__ == '__main__':
    from follow import follow

    # Set up a processing pipe : tail -f | grep python
    logfile  = open("access-log")
    loglines = follow(logfile)
    pylines  = grep("python",loglines)

    # Pull results out of the processing pipeline
    for line in pylines:
        print(line, end=' ')

<a href="grep.py">grep.py</a>.  A first example of a coroutine function.  This function receives lines
 and prints out those that contain a substring..

In [15]:
# grep.py
#
# A very simple coroutine

def grep(pattern):
    print("Looking for %s" % pattern)
    line = "Vide"
    while True:
        line = yield f"LIGNE: {line}"
        if pattern in line:
            print(line, end=' ')

# Example use
if __name__ == '__main__':
    g = grep("python")
    next(g)
    g.send("Yeah, but no, but yeah, but no")
    g.send("A series of tubes")
    g.send("python generators rock!")

Looking for python
python generators rock! 

In [7]:
g = grep("python")

In [8]:
next(g)

Looking for python


In [11]:
g.send("python generators rock!")

python generators rock! 

<tt>.next()</tt> when
 starting a coroutine..

In [17]:
# coroutine.py
#
# A decorator function that takes care of starting a coroutine
# automatically on call.

def coroutine(func):
    def start(*args,**kwargs):
        cr = func(*args,**kwargs)
        next(cr)
        return cr
    return start

# Example use
if __name__ == '__main__':
    @coroutine
    def grep(pattern):
        print("Looking for %s" % pattern)
        while True:
            line = (yield)
            if pattern in line:
                print(line, end=' ')

    g = grep("python")
    # Notice how you don't need a next() call here
    g.send("Yeah, but no, but yeah, but no")
    g.send("A series of tubes")
    g.send("python generators rock!")

Looking for python
python generators rock! 

<tt>close()</tt> operation.
 .

In [18]:
# grepclose.py
#
# A coroutine that catches the close() operation

from coroutine import coroutine

@coroutine
def grep(pattern):
    print("Looking for %s" % pattern)
    try:
        while True:
            line = (yield)
            if pattern in line:
                print(line, end=' ')
    except GeneratorExit:
        print("Going away. Goodbye")

# Example use
if __name__ == '__main__':
    g = grep("python")
    g.send("Yeah, but no, but yeah, but no\n")
    g.send("A series of tubes\n")
    g.send("python generators rock!\n")
    g.close()

Looking for python
python generators rock!
 Going away. Goodbye


<a href="bogus.py">bogus.py</a>. An example of a bogus generator that generates and receives values (not a recommended
 coding style)..

In [25]:
# bogus.py
#
# Bogus example of a generator that produces and receives values

def countdown(n):
    print("Counting down from", n)
    while n >= 0:
        newvalue = (yield n) 
        # If a new value got sent in, reset n with it
        if newvalue is not None:
            n = newvalue
        else:
            n -= 1

# The holy grail countdown
c = countdown(5)
for x in c:
    print(x)
    if x == 5:
        c.send(3)

Counting down from 5
5
2
1
0


# Coroutines, Pipelines, and Dataflow

<a href="cofollow.py">cofollow.py</a>.  A simple example of feeding data from a data source into a coroutine. This
 mirrors the 'tail -f' example from earlier..

In [None]:
# cofollow.py
#
# A simple example showing how to hook up a pipeline with
# coroutines.   To run this, you will need a log file.
# Run the program logsim.py in the background to get a data
# source.

from coroutine import coroutine

# A data source.  This is not a coroutine, but it sends
# data into one (target)

import time
def follow(thefile, target):
    thefile.seek(0,2)      # Go to the end of the file
    while True:
         line = thefile.readline()
         if not line:
             time.sleep(0.1)    # Sleep briefly
             continue
         target.send(line)

# A sink.  A coroutine that receives data

@coroutine
def printer():
    while True:
         line = (yield)
         print(line, end=' ')

# Example use
if __name__ == '__main__':
    f = open("access-log")
    follow(f,printer())

<a href="copipe.py">copipe.py</a>.  An example of setting up a processing pipeline with coroutines..

In [None]:
# copipe.py
#
# A simple example showing how to hook up a pipeline with
# coroutines.   To run this, you will need a log file.
# Run the program logsim.py in the background to get a data
# source.

from coroutine import coroutine

# A data source.  This is not a coroutine, but it sends
# data into one (target)

import time
def follow(thefile, target):
    thefile.seek(0,2)      # Go to the end of the file
    while True:
         line = thefile.readline()
         if not line:
             time.sleep(0.1)    # Sleep briefly
             continue
         target.send(line)

# A filter.

@coroutine
def grep(pattern,target):
    while True:
        line = (yield)           # Receive a line
        if pattern in line:
            target.send(line)    # Send to next stage

# A sink.  A coroutine that receives data

@coroutine
def printer():
    while True:
         line = (yield)
         print(line, end=' ')

# Example use
if __name__ == '__main__':
    f = open("access-log")
    follow(f,
           grep('python',
           printer()))

<a href="cobroadcast.py">cobroadcast.py</a>.  An example of a coroutine broadcaster.  This fans a data stream out to
 multiple targets..

In [None]:
# cobroadcast.py
#
# An example of broadcasting a data stream onto multiple coroutine targets.

from coroutine import coroutine

# A data source.  This is not a coroutine, but it sends
# data into one (target)

import time
def follow(thefile, target):
    thefile.seek(0,2)      # Go to the end of the file
    while True:
         line = thefile.readline()
         if not line:
             time.sleep(0.1)    # Sleep briefly
             continue
         target.send(line)

# A filter.
@coroutine
def grep(pattern,target):
    while True:
        line = (yield)           # Receive a line
        if pattern in line:
            target.send(line)    # Send to next stage

# A sink.  A coroutine that receives data
@coroutine
def printer():
    while True:
         line = (yield)
         print(line, end=' ')

# Broadcast a stream onto multiple targets
@coroutine
def broadcast(targets):
    while True:
        item = (yield)
        for target in targets:
            target.send(item)

# Example use
if __name__ == '__main__':
    f = open("access-log")
    follow(f,
       broadcast([grep('python',printer()),
                  grep('ply',printer()),
                  grep('swig',printer())])
           )

<a href="cobroadcast2.py">cobroadcast2.py</a>.  An example of broadcasting with a slightly different data handling pattern..

In [None]:
# cobroadcast2.py
#
# An example of broadcasting a data stream onto multiple coroutine targets.
# This example shows "fan-in"---a situation where multiple coroutines
# send to the same target.

from coroutine import coroutine

# A data source.  This is not a coroutine, but it sends
# data into one (target)

import time
def follow(thefile, target):
    thefile.seek(0,2)      # Go to the end of the file
    while True:
         line = thefile.readline()
         if not line:
             time.sleep(0.1)    # Sleep briefly
             continue
         target.send(line)

# A filter.
@coroutine
def grep(pattern,target):
    while True:
        line = (yield)           # Receive a line
        if pattern in line:
            target.send(line)    # Send to next stage

# A sink.  A coroutine that receives data
@coroutine
def printer():
    while True:
         line = (yield)
         print(line, end=' ')

# Broadcast a stream onto multiple targets
@coroutine
def broadcast(targets):
    while True:
        item = (yield)
        for target in targets:
            target.send(item)

# Example use
if __name__ == '__main__':
    f = open("access-log")
    p = printer()
    follow(f,
       broadcast([grep('python',p),
                  grep('ply',p),
                  grep('swig',p)])
           )

<a href="benchmark.py">benchmark.py</a>.  A small benchmark comparing the performance of sending data into a coroutine
 vs. sending data into an instance of a class..

In [None]:
# benchmark.py
#
# A micro benchmark comparing the performance of sending messages into
# a coroutine vs. sending messages into an object

# An object
class GrepHandler(object):
    def __init__(self,pattern, target):
        self.pattern = pattern
        self.target = target
    def send(self,line):
        if self.pattern in line:
            self.target.send(line)

# A coroutine
from coroutine import coroutine

@coroutine
def grep(pattern,target):
    while True:
        line = (yield)
        if pattern in line:
            target.send(line)

# A null-sink to send data
@coroutine
def null(): 
    while True: item = (yield)

# A benchmark
line = 'python is nice'
p1   = grep('python',null())          # Coroutine
p2   = GrepHandler('python',null())   # Object

from timeit import timeit

print("coroutine:", timeit("p1.send(line)",
                          "from __main__ import line, p1"))

print("object:", timeit("p2.send(line)",
                        "from __main__ import line, p2"))

# Coroutines and Event Dispatching

<a href="basicsax.py">basicsax.py</a>.  A very basic example of the SAX API for parsing XML documents (does
 not involve coroutines)..

In [None]:
# basicsax.py
#
# A very simple example illustrating the SAX XML parsing interface

import xml.sax

class MyHandler(xml.sax.ContentHandler):
    def startElement(self,name,attrs):
        print("startElement", name)
    def endElement(self,name):
        print("endElement", name)
    def characters(self,text):
        print("characters", repr(text)[:40])

xml.sax.parse("allroutes.xml",MyHandler())

<a href="cosax.py">cosax.py</a>.  An example that pushes SAX events into a coroutine..

In [None]:
# cosax.py
#
# An example showing how to push SAX events into a coroutine target

import xml.sax

class EventHandler(xml.sax.ContentHandler):
    def __init__(self,target):
        self.target = target
    def startElement(self,name,attrs):
        self.target.send(('start',(name,attrs._attrs)))
    def characters(self,text):
        self.target.send(('text',text))
    def endElement(self,name):
        self.target.send(('end',name))

# example use
if __name__ == '__main__':
    from coroutine import *

    @coroutine
    def printer():
        while True:
            event = (yield)
            print(event)

    xml.sax.parse("allroutes.xml",
                  EventHandler(printer()))

<a href="buses.py">buses.py</a>.  An example of parsing and filtering XML data with a 
 series of connected coroutines..

In [None]:
# buses.py
#
# An example of setting up an event handling pipeline with coroutines
# and XML parsing.

from coroutine import *

@coroutine
def buses_to_dicts(target):
    while True:
        event, value = (yield)
        # Look for the start of a <bus> element
        if event == 'start' and value[0] == 'bus':
            busdict = { }
            fragments = []
            # Capture text of inner elements in a dict
            while True:
                event, value = (yield)
                if event == 'start':   fragments = []
                elif event == 'text':  fragments.append(value)
                elif event == 'end':
                    if value != 'bus': 
                        busdict[value] = "".join(fragments)
                    else:
                        target.send(busdict)
                        break

@coroutine
def filter_on_field(fieldname,value,target):
    while True:
        d = (yield)
        if d.get(fieldname) == value:
            target.send(d)

@coroutine
def bus_locations():
    while True:
        bus = (yield)
        print("%(route)s,%(id)s,\"%(direction)s\","\
              "%(latitude)s,%(longitude)s" % bus) 

# Example 
if __name__ == '__main__':
    import xml.sax
    from cosax import EventHandler

    xml.sax.parse("allroutes.xml",
              EventHandler(
                   buses_to_dicts(
                   filter_on_field("route","22",
                   filter_on_field("direction","North Bound",
                   bus_locations())))
              ))

<tt>cosax.py</tt> above..

In [None]:
# coexpat.py
#
# An example of pushing XML events generated by the low-level expat
# XML library into coroutines.

import xml.parsers.expat

def expat_parse(f,target):
    parser = xml.parsers.expat.ParserCreate()
    parser.buffer_size = 65536
    parser.buffer_text = True
    parser.returns_unicode = False
    parser.StartElementHandler = \
       lambda name,attrs: target.send(('start',(name,attrs)))
    parser.EndElementHandler = \
       lambda name: target.send(('end',name))
    parser.CharacterDataHandler = \
       lambda data: target.send(('text',data))
    parser.ParseFile(f)

# Example.  This uses the bus processing code from earlier with no changes.

if __name__ == '__main__':
    from buses import *

    expat_parse(open("allroutes.xml"),
            buses_to_dicts(
            filter_on_field("route","22",
            filter_on_field("direction","North Bound",
            bus_locations()))))

<tt>python setup.py
 build_ext --inplace</tt>.  Note: This main focus of this class is not
 C extension building so you might have to mess around with this to get
 it to work.  Key point: you can dispatch data into a coroutine
 directly from C..

<a href="iterbus.py">iterbus.py</a>.  An example of incremental
 XML parsing with the ElementTree module (for comparison with
 coroutines).
 .

In [None]:
# iterbus.py
#
# An example of incremental XML parsing with the ElementTree library

from xml.etree.cElementTree import iterparse

for event,elem in iterparse("allroutes.xml",('start','end')):
    if event == 'start' and elem.tag == 'buses':
        buses = elem
    elif event == 'end' and elem.tag == 'bus':
        busdict = dict((child.tag,child.text) 
                        for child in elem)
        if (busdict['route'] == '22' and 
            busdict['direction'] == 'North Bound'):
            print("%(id)s,%(route)s,\"%(direction)s\","\
                  "%(latitude)s,%(longitude)s" % busdict)
        buses.remove(elem)

# From Data Processing to Concurrent Programming

<a href="cothread.py">cothread.py</a>.  A thread object that runs
 a coroutine..

In [None]:
# cothread.py
#
# A thread object that runs a coroutine inside it.  Messages get sent
# via a Queue object

from threading import Thread
from queue import Queue
from coroutine import *

@coroutine
def threaded(target):
    messages = Queue()
    def run_target():
        while True:
            item = messages.get()
            if item is GeneratorExit:
                target.close()
                return
            else:
                target.send(item)
    Thread(target=run_target).start()
    try:
        while True:
            item = (yield)
            messages.put(item)
    except GeneratorExit:
        messages.put(GeneratorExit)

# Example use

if __name__ == '__main__':
    import xml.sax
    from cosax import EventHandler
    from buses import *

    xml.sax.parse("allroutes.xml", EventHandler(
                    buses_to_dicts(
                    threaded(
                         filter_on_field("route","22",
                         filter_on_field("direction","North Bound",
                         bus_locations()))
                    ))))

<a href="busproc.py">busproc.py</a> in a subprocess and sends
 data to it..

In [None]:
# coprocess.py
#
# An example of running a coroutine in a subprocess connected by a pipe

import pickle as pickle
from coroutine import *

@coroutine
def sendto(f):
    try:
        while True:
            item = (yield)
            pickle.dump(item,f)
            f.flush()
    except StopIteration:
        f.close()

def recvfrom(f,target):
    try:
        while True:
            item = pickle.load(f)
            target.send(item)
    except EOFError:
        target.close()


# Example use
if __name__ == '__main__':
    import xml.sax
    from cosax import EventHandler
    from buses import *

    import subprocess
    p = subprocess.Popen(['python','busproc.py'],
                         stdin=subprocess.PIPE)

    xml.sax.parse("allroutes.xml",
                  EventHandler(
                          buses_to_dicts(
                          sendto(p.stdin))))

In [None]:
# busproc.py
#
# Bus processor.  This runs as a subprocess from the coprocess.py example

import sys
from coprocess import recvfrom
from buses import *

recvfrom(sys.stdin,
         filter_on_field("route","22",
         filter_on_field("direction","North Bound",
         bus_locations())))

<a href="cocrash.py">cocrash.py</a>. An example of crashing a
 thread by having it send data into an already executing coroutine.
 Since this example depends on thread synchronization, it may
 occasionally "work" by accident.  Run it a few more times..

In [None]:
# cocrash.py
#
# An example of hooking coroutines up in a way that might cause a potential
# crash.   Basically, there are two threads feeding data into the
# printer() coroutine.    

from cobroadcast import *
from cothread import threaded

p = printer()
target = broadcast([threaded(grep('foo',p)),
                    threaded(grep('bar',p))])

# Adjust the count if this doesn't cause a crash
for i in range(10):
    target.send("foo is nice\n")
    target.send("bar is bad\n")

del target
del p

# Writing an Operating System

<a href="pyos1.py">pyos1.py</a>. A simple object representing a "task."  It is a wrapper around
 a coroutine..

In [None]:
# ------------------------------------------------------------
# pyos1.py  -  The Python Operating System
# 
# Step 1: Tasks
# ------------------------------------------------------------

# This object encapsulates a running task.

class Task(object):
    taskid = 0
    def __init__(self,target):
        Task.taskid += 1
        self.tid     = Task.taskid   # Task ID
        self.target  = target        # Target coroutine
        self.sendval = None          # Value to send

    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)

# ------------------------------------------------------------
#                       == Example ==
# ------------------------------------------------------------
if __name__ == '__main__':
    
    # A simple generator/coroutine function
    def foo():
        print("Part 1")
        yield
        print("Part 2")
        yield

    t1 = Task(foo())
    print("Running foo()")
    t1.run()
    print("Resuming foo()")
    t1.run()

    # If you call t1.run() one more time, you get StopIteration.
    # Uncomment the next statement to see that.

    # t1.run()

<a href="pyos2.py">pyos2.py</a>. A simple task scheduler that alternates between
 tasks whenever they yield..

In [None]:
# ------------------------------------------------------------
# pyos2.py  -  The Python Operating System
#
# Step 2: A Scheduler
# ------------------------------------------------------------


# ------------------------------------------------------------
#                       === Tasks ===
# ------------------------------------------------------------
class Task(object):
    taskid = 0
    def __init__(self,target):
        Task.taskid += 1
        self.tid     = Task.taskid   # Task ID
        self.target  = target        # Target coroutine
        self.sendval = None          # Value to send

    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)

# ------------------------------------------------------------
#                      === Scheduler ===
# ------------------------------------------------------------
from queue import Queue

class Scheduler(object):
    def __init__(self):
        self.ready   = Queue()   
        self.taskmap = {}        

    def new(self,target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid

    def schedule(self,task):
        self.ready.put(task)

    def mainloop(self):
        while self.taskmap:
            task = self.ready.get()
            result = task.run()
            self.schedule(task)

# ------------------------------------------------------------
#                      === Example ===
# ------------------------------------------------------------
if __name__ == '__main__':
    
    # Two tasks
    def foo():
        while True:
            print("I'm foo")
            yield

    def bar():
        while True:
            print("I'm bar")
            yield    
        
    # Run them
    sched = Scheduler()
    sched.new(foo())
    sched.new(bar())
    sched.mainloop()

<a href="taskcrash.py">taskcrash.py</a>. An example showing how the schedule crashes
 if one of the tasks terminates..

In [None]:
# taskcrash.py
#
# An example that shows how the initial scheduler doesn't handle
# task termination correctly.

from pyos2 import Scheduler

def foo():
    for i in range(10):
        print("I'm foo")
        yield
def bar():
     while True:
         print("I'm bar")
         yield

sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()

<a href="pyos3.py">pyos3.py</a>. An improved scheduler that properly handles
 task termination..

In [None]:
# ------------------------------------------------------------
# pyos3.py  -  The Python Operating System
#
# Step 3: Added handling for task termination
# ------------------------------------------------------------

# ------------------------------------------------------------
#                       === Tasks ===
# ------------------------------------------------------------
class Task(object):
    taskid = 0
    def __init__(self,target):
        Task.taskid += 1
        self.tid     = Task.taskid   # Task ID
        self.target  = target        # Target coroutine
        self.sendval = None          # Value to send

    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)

# ------------------------------------------------------------
#                      === Scheduler ===
# ------------------------------------------------------------
from queue import Queue

class Scheduler(object):
    def __init__(self):
        self.ready   = Queue()   
        self.taskmap = {}        

    def new(self,target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid

    def exit(self,task):
        print("Task %d terminated" % task.tid)
        del self.taskmap[task.tid]

    def schedule(self,task):
        self.ready.put(task)

    def mainloop(self):
         while self.taskmap:
            task = self.ready.get()
            try:
                result = task.run()
            except StopIteration:
                self.exit(task)
                continue
            self.schedule(task)

# ------------------------------------------------------------
#                      === Example ===
# ------------------------------------------------------------
if __name__ == '__main__':
    def foo():
        for i in range(10):
            print("I'm foo")
            yield

    def bar():
        for i in range(5):
            print("I'm bar")
            yield

    sched = Scheduler()
    sched.new(foo())
    sched.new(bar())
    sched.mainloop()

<a href="pyos4.py">pyos4.py</a>. A scheduler with support for "system calls."  .

In [None]:
# ------------------------------------------------------------
# pyos4.py  -  The Python Operating System
#
# Step 4: Introduce the idea of a "System Call"
# ------------------------------------------------------------

# ------------------------------------------------------------
#                       === Tasks ===
# ------------------------------------------------------------
class Task(object):
    taskid = 0
    def __init__(self,target):
        Task.taskid += 1
        self.tid     = Task.taskid   # Task ID
        self.target  = target        # Target coroutine
        self.sendval = None          # Value to send

    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)

# ------------------------------------------------------------
#                      === Scheduler ===
# ------------------------------------------------------------
from queue import Queue

class Scheduler(object):
    def __init__(self):
        self.ready   = Queue()   
        self.taskmap = {}        

    def new(self,target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid

    def exit(self,task):
        print("Task %d terminated" % task.tid)
        del self.taskmap[task.tid]

    def schedule(self,task):
        self.ready.put(task)

    def mainloop(self):
         while self.taskmap:
            task = self.ready.get()
            try:
                result = task.run()
                if isinstance(result,SystemCall):
                    result.task  = task
                    result.sched = self
                    result.handle()
                    continue
            except StopIteration:
                self.exit(task)
                continue
            self.schedule(task)

# ------------------------------------------------------------
#                   === System Calls ===
# ------------------------------------------------------------

class SystemCall(object):
    def handle(self):
        pass

# Return a task's ID number
class GetTid(SystemCall):
    def handle(self):
        self.task.sendval = self.task.tid
        self.sched.schedule(self.task)

# ------------------------------------------------------------
#                      === Example ===
# ------------------------------------------------------------
if __name__ == '__main__':

    def foo():
        mytid = yield GetTid()
        for i in range(5):
            print("I'm foo", mytid)
            yield

    def bar():
        mytid = yield GetTid()
        for i in range(10):
            print("I'm bar", mytid)
            yield

    sched = Scheduler()
    sched.new(foo())
    sched.new(bar())
    sched.mainloop()

<a href="pyos5.py">pyos5.py</a>. A scheduler with system calls for basic task
 creation and termination..

In [None]:
# ------------------------------------------------------------
# pyos5.py  -  The Python Operating System
#
# Step 5: Added system calls for simple task management
# ------------------------------------------------------------

# ------------------------------------------------------------
#                       === Tasks ===
# ------------------------------------------------------------
class Task(object):
    taskid = 0
    def __init__(self,target):
        Task.taskid += 1
        self.tid     = Task.taskid   # Task ID
        self.target  = target        # Target coroutine
        self.sendval = None          # Value to send

    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)

# ------------------------------------------------------------
#                      === Scheduler ===
# ------------------------------------------------------------
from queue import Queue

class Scheduler(object):
    def __init__(self):
        self.ready   = Queue()   
        self.taskmap = {}        

    def new(self,target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid

    def exit(self,task):
        print("Task %d terminated" % task.tid)
        del self.taskmap[task.tid]

    def schedule(self,task):
        self.ready.put(task)

    def mainloop(self):
         while self.taskmap:
            task = self.ready.get()
            try:
                result = task.run()
                if isinstance(result,SystemCall):
                    result.task  = task
                    result.sched = self
                    result.handle()
                    continue
            except StopIteration:
                self.exit(task)
                continue
            self.schedule(task)

# ------------------------------------------------------------
#                   === System Calls ===
# ------------------------------------------------------------

class SystemCall(object):
    def handle(self):
        pass

# Return a task's ID number
class GetTid(SystemCall):
    def handle(self):
        self.task.sendval = self.task.tid
        self.sched.schedule(self.task)

# Create a new task
class NewTask(SystemCall):
    def __init__(self,target):
        self.target = target
    def handle(self):
        tid = self.sched.new(self.target)
        self.task.sendval = tid
        self.sched.schedule(self.task)

# Kill a task
class KillTask(SystemCall):
    def __init__(self,tid):
        self.tid = tid
    def handle(self):
        task = self.sched.taskmap.get(self.tid,None)
        if task:
            task.target.close() 
            self.task.sendval = True
        else:
            self.task.sendval = False
        self.sched.schedule(self.task)

# ------------------------------------------------------------
#                      === Example ===
# ------------------------------------------------------------
if __name__ == '__main__':
    def foo():
        mytid = yield GetTid()
        while True:
            print("I'm foo", mytid)
            yield

    def main():
        child = yield NewTask(foo())    # Launch new task
        for i in range(5):
            yield
        yield KillTask(child)           # Kill the task
        print("main done")

    sched = Scheduler()
    sched.new(main())
    sched.mainloop()

<a href="pyos6.py">pyos6.py</a>. A scheduler that adds support for task waiting..

In [None]:
# ------------------------------------------------------------
# pyos6.py  -  The Python Operating System
#
# Added support for task waiting
# ------------------------------------------------------------

# ------------------------------------------------------------
#                       === Tasks ===
# ------------------------------------------------------------
class Task(object):
    taskid = 0
    def __init__(self,target):
        Task.taskid += 1
        self.tid     = Task.taskid   # Task ID
        self.target  = target        # Target coroutine
        self.sendval = None          # Value to send

    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)

# ------------------------------------------------------------
#                      === Scheduler ===
# ------------------------------------------------------------
from queue import Queue

class Scheduler(object):
    def __init__(self):
        self.ready   = Queue()   
        self.taskmap = {}        

        # Tasks waiting for other tasks to exit
        self.exit_waiting = {}

    def new(self,target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid

    def exit(self,task):
        print("Task %d terminated" % task.tid)
        del self.taskmap[task.tid]
        # Notify other tasks waiting for exit
        for task in self.exit_waiting.pop(task.tid,[]):
            self.schedule(task)

    def waitforexit(self,task,waittid):
        if waittid in self.taskmap:
            self.exit_waiting.setdefault(waittid,[]).append(task)
            return True
        else:
            return False

    def schedule(self,task):
        self.ready.put(task)

    def mainloop(self):
         while self.taskmap:
            task = self.ready.get()
            try:
                result = task.run()
                if isinstance(result,SystemCall):
                    result.task  = task
                    result.sched = self
                    result.handle()
                    continue
            except StopIteration:
                self.exit(task)
                continue
            self.schedule(task)

# ------------------------------------------------------------
#                   === System Calls ===
# ------------------------------------------------------------

class SystemCall(object):
    def handle(self):
        pass

# Return a task's ID number
class GetTid(SystemCall):
    def handle(self):
        self.task.sendval = self.task.tid
        self.sched.schedule(self.task)

# Create a new task
class NewTask(SystemCall):
    def __init__(self,target):
        self.target = target
    def handle(self):
        tid = self.sched.new(self.target)
        self.task.sendval = tid
        self.sched.schedule(self.task)

# Kill a task
class KillTask(SystemCall):
    def __init__(self,tid):
        self.tid = tid
    def handle(self):
        task = self.sched.taskmap.get(self.tid,None)
        if task:
            task.target.close() 
            self.task.sendval = True
        else:
            self.task.sendval = False
        self.sched.schedule(self.task)

# Wait for a task to exit
class WaitTask(SystemCall):
    def __init__(self,tid):
        self.tid = tid
    def handle(self):
        result = self.sched.waitforexit(self.task,self.tid)
        self.task.sendval = result
        # If waiting for a non-existent task,
        # return immediately without waiting
        if not result:
            self.sched.schedule(self.task)

# ------------------------------------------------------------
#                      === Example ===
# ------------------------------------------------------------
if __name__ == '__main__':
    def foo():
        for i in range(5):
            print("I'm foo")
            yield

    def main():
        child = yield NewTask(foo())
        print("Waiting for child")
        yield WaitTask(child)
        print("Child done")

    sched = Scheduler()
    sched.new(main())
    sched.mainloop()

<a href="echobad.py">echobad.py</a>. A broken example of trying to use coroutines
 to implement a multitasking network server.  It breaks due to blocking I/O operations..

In [None]:
# echobad.py
#
# A flawed implementation of an concurrent echo server using our task scheduler.
# If you run this, you will notice that it freezes up right away.  This
# is because the accept() operation in the server is blocking.  Since we're
# not using threads, this blocks the operation of everything!

from pyos6 import *
from socket import *

def handle_client(client,addr):
    print("Connection from", addr)
    while True:
        data = client.recv(65536)
        if not data:
            break
        client.send(data)
    client.close()
    print("Client closed")
    yield           # Make the function a generator/coroutine

def server(port):
    print("Server starting")
    sock = socket(AF_INET,SOCK_STREAM)
    sock.bind(("",port))
    sock.listen(5)
    while True:
        client,addr = sock.accept()
        yield NewTask(handle_client(client,addr))

def alive():
        while True:
            print("I'm alive!")
            yield

sched = Scheduler()
sched.new(alive())
sched.new(server(45000))
sched.mainloop()

<a href="pyos7.py">pyos7.py</a>. A scheduler that adds support for I/O waiting..

In [None]:
# ------------------------------------------------------------
# pyos7.py  -  The Python Operating System
#
# Step 6 : I/O Waiting Support added
# ------------------------------------------------------------

# ------------------------------------------------------------
#                       === Tasks ===
# ------------------------------------------------------------
class Task(object):
    taskid = 0
    def __init__(self,target):
        Task.taskid += 1
        self.tid     = Task.taskid   # Task ID
        self.target  = target        # Target coroutine
        self.sendval = None          # Value to send

    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)

# ------------------------------------------------------------
#                      === Scheduler ===
# ------------------------------------------------------------
from queue import Queue
import select

class Scheduler(object):
    def __init__(self):
        self.ready   = Queue()   
        self.taskmap = {}        

        # Tasks waiting for other tasks to exit
        self.exit_waiting = {}

        # I/O waiting
        self.read_waiting = {}
        self.write_waiting = {}
        
    def new(self,target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid

    def exit(self,task):
        print("Task %d terminated" % task.tid)
        del self.taskmap[task.tid]
        # Notify other tasks waiting for exit
        for task in self.exit_waiting.pop(task.tid,[]):
            self.schedule(task)

    def waitforexit(self,task,waittid):
        if waittid in self.taskmap:
            self.exit_waiting.setdefault(waittid,[]).append(task)
            return True
        else:
            return False

    # I/O waiting
    def waitforread(self,task,fd):
        self.read_waiting[fd] = task

    def waitforwrite(self,task,fd):
        self.write_waiting[fd] = task

    def iopoll(self,timeout):
        if self.read_waiting or self.write_waiting:
           r,w,e = select.select(self.read_waiting,
                                 self.write_waiting,[],timeout)
           for fd in r: self.schedule(self.read_waiting.pop(fd))
           for fd in w: self.schedule(self.write_waiting.pop(fd))

    def iotask(self):
        while True:
            if self.ready.empty():
                self.iopoll(None)
            else:
                self.iopoll(0)
            yield

    def schedule(self,task):
        self.ready.put(task)

    def mainloop(self):
         self.new(self.iotask())
         while self.taskmap:
             task = self.ready.get()
             try:
                 result = task.run()
                 if isinstance(result,SystemCall):
                     result.task  = task
                     result.sched = self
                     result.handle()
                     continue
             except StopIteration:
                 self.exit(task)
                 continue
             self.schedule(task)

# ------------------------------------------------------------
#                   === System Calls ===
# ------------------------------------------------------------

class SystemCall(object):
    def handle(self):
        pass

# Return a task's ID number
class GetTid(SystemCall):
    def handle(self):
        self.task.sendval = self.task.tid
        self.sched.schedule(self.task)

# Create a new task
class NewTask(SystemCall):
    def __init__(self,target):
        self.target = target
    def handle(self):
        tid = self.sched.new(self.target)
        self.task.sendval = tid
        self.sched.schedule(self.task)

# Kill a task
class KillTask(SystemCall):
    def __init__(self,tid):
        self.tid = tid
    def handle(self):
        task = self.sched.taskmap.get(self.tid,None)
        if task:
            task.target.close() 
            self.task.sendval = True
        else:
            self.task.sendval = False
        self.sched.schedule(self.task)

# Wait for a task to exit
class WaitTask(SystemCall):
    def __init__(self,tid):
        self.tid = tid
    def handle(self):
        result = self.sched.waitforexit(self.task,self.tid)
        self.task.sendval = result
        # If waiting for a non-existent task,
        # return immediately without waiting
        if not result:
            self.sched.schedule(self.task)

# Wait for reading
class ReadWait(SystemCall):
    def __init__(self,f):
        self.f = f
    def handle(self):
        fd = self.f.fileno()
        self.sched.waitforread(self.task,fd)

# Wait for writing
class WriteWait(SystemCall):
    def __init__(self,f):
        self.f = f
    def handle(self):
        fd = self.f.fileno()
        self.sched.waitforwrite(self.task,fd)

# ------------------------------------------------------------
#                      === Example ===
# ------------------------------------------------------------

# Run the script echogood.py to see this work

<a href="echogood.py">echogood.py</a>. A slightly modified version of the echo server
 above that properly handles blocking I/O..

In [None]:
# echogood.py
#
# A another attempt at an echo server.  This one works because
# of the I/O waiting operations that suspend the tasks when there
# is no data available.  Compare to echobad.py

from socket import *
from pyos7 import *

def handle_client(client,addr):
    print("Connection from", addr)
    while True:
        yield ReadWait(client)
        data = client.recv(65536)
        if not data:
            break
        yield WriteWait(client)
        client.send(data)
    client.close()
    print("Client closed")

def server(port):
    print("Server starting")
    sock = socket(AF_INET,SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    sock.bind(("",port))
    sock.listen(5)
    while True:
        yield ReadWait(sock)
        client,addr = sock.accept()
        yield NewTask(handle_client(client,addr))    

def alive():
        while True:
            print("I'm alive!")
            yield

sched = Scheduler()
sched.new(alive())
sched.new(server(45000))
sched.mainloop()

<a href="echogood2.py">echogood2.py</a>. An echo server without the "I'm alive" messages..

In [None]:
# echogood2.py
#
# A working concurrent echo server

from socket import *
from pyos7 import *

def handle_client(client,addr):
    print("Connection from", addr)
    while True:
        yield ReadWait(client)
        data = client.recv(65536)
        if not data:
            break
        yield WriteWait(client)
        client.send(data)
    client.close()
    print("Client closed")

def server(port):
    print("Server starting")
    sock = socket(AF_INET,SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    sock.bind(("",port))
    sock.listen(1024)
    while True:
        yield ReadWait(sock)
        client,addr = sock.accept()
        yield NewTask(handle_client(client,addr))    

sched = Scheduler()
sched.new(server(45000))
sched.mainloop()

# The Problem with Subroutines and the Stack

<a href="trampoline.py">trampoline.py</a>.  An example of "trampolining" between coroutines..

In [None]:
# trampoline.py
#
# A simple of example of trampoling between coroutines

# A subroutine
def add(x,y):
    yield x+y

# A function that calls a subroutine
def main():
    r = yield add(2,2)
    print(r)
    yield

def run():
    m      = main()       
    # An example of a "trampoline"
    sub    = m.send(None)             
    result = sub.send(None)
    m.send(result)

run()

<a href="pyos8.py">pyos8.py</a>.  The OS with an enhanced Task
 object that allows coroutines to transfer control to other coroutines
 like subroutines..

In [None]:
# ------------------------------------------------------------
# pyos8.py  -  The Python Operating System
#
# Step 7 : Support for coroutine trampolines (subroutines)
# ------------------------------------------------------------

import types

# ------------------------------------------------------------
#                       === Tasks ===
# ------------------------------------------------------------
class Task(object):
    taskid = 0
    def __init__(self,target):
        Task.taskid += 1
        self.tid     = Task.taskid   # Task ID
        self.target  = target        # Target coroutine
        self.sendval = None          # Value to send
        self.stack   = []            # Call stack

    # Run a task until it hits the next yield statement
    def run(self):
        while True:
            try:
                result = self.target.send(self.sendval)
                if isinstance(result,SystemCall): return result
                if isinstance(result,types.GeneratorType):
                    self.stack.append(self.target)
                    self.sendval = None
                    self.target  = result
                else:
                    if not self.stack: return
                    self.sendval = result
                    self.target  = self.stack.pop()
            except StopIteration:
                if not self.stack: raise
                self.sendval = None
                self.target = self.stack.pop()

# ------------------------------------------------------------
#                      === Scheduler ===
# ------------------------------------------------------------
from queue import Queue
import select

class Scheduler(object):
    def __init__(self):
        self.ready   = Queue()   
        self.taskmap = {}        

        # Tasks waiting for other tasks to exit
        self.exit_waiting = {}

        # I/O waiting
        self.read_waiting = {}
        self.write_waiting = {}
        
    def new(self,target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid

    def exit(self,task):
        print("Task %d terminated" % task.tid)
        del self.taskmap[task.tid]
        # Notify other tasks waiting for exit
        for task in self.exit_waiting.pop(task.tid,[]):
            self.schedule(task)

    def waitforexit(self,task,waittid):
        if waittid in self.taskmap:
            self.exit_waiting.setdefault(waittid,[]).append(task)
            return True
        else:
            return False

    # I/O waiting
    def waitforread(self,task,fd):
        self.read_waiting[fd] = task

    def waitforwrite(self,task,fd):
        self.write_waiting[fd] = task

    def iopoll(self,timeout):
        if self.read_waiting or self.write_waiting:
           r,w,e = select.select(self.read_waiting,
                                 self.write_waiting,[],timeout)
           for fd in r: self.schedule(self.read_waiting.pop(fd))
           for fd in w: self.schedule(self.write_waiting.pop(fd))

    def iotask(self):
        while True:
            if self.ready.empty():
                self.iopoll(None)
            else:
                self.iopoll(0)
            yield

    def schedule(self,task):
        self.ready.put(task)

    def mainloop(self):
         self.new(self.iotask())
         while self.taskmap:
             task = self.ready.get()
             try:
                 result = task.run()
                 if isinstance(result,SystemCall):
                     result.task  = task
                     result.sched = self
                     result.handle()
                     continue
             except StopIteration:
                 self.exit(task)
                 continue
             self.schedule(task)

# ------------------------------------------------------------
#                   === System Calls ===
# ------------------------------------------------------------

class SystemCall(object):
    def handle(self):
        pass

# Return a task's ID number
class GetTid(SystemCall):
    def handle(self):
        self.task.sendval = self.task.tid
        self.sched.schedule(self.task)

# Create a new task
class NewTask(SystemCall):
    def __init__(self,target):
        self.target = target
    def handle(self):
        tid = self.sched.new(self.target)
        self.task.sendval = tid
        self.sched.schedule(self.task)

# Kill a task
class KillTask(SystemCall):
    def __init__(self,tid):
        self.tid = tid
    def handle(self):
        task = self.sched.taskmap.get(self.tid,None)
        if task:
            task.target.close() 
            self.task.sendval = True
        else:
            self.task.sendval = False
        self.sched.schedule(self.task)

# Wait for a task to exit
class WaitTask(SystemCall):
    def __init__(self,tid):
        self.tid = tid
    def handle(self):
        result = self.sched.waitforexit(self.task,self.tid)
        self.task.sendval = result
        # If waiting for a non-existent task,
        # return immediately without waiting
        if not result:
            self.sched.schedule(self.task)

# Wait for reading
class ReadWait(SystemCall):
    def __init__(self,f):
        self.f = f
    def handle(self):
        fd = self.f.fileno()
        self.sched.waitforread(self.task,fd)

# Wait for writing
class WriteWait(SystemCall):
    def __init__(self,f):
        self.f = f
    def handle(self):
        fd = self.f.fileno()
        self.sched.waitforwrite(self.task,fd)

# ------------------------------------------------------------
#                      === Library Functions ===
# ------------------------------------------------------------

def Accept(sock):
    yield ReadWait(sock)
    yield sock.accept()

def Send(sock,buffer):
    while buffer:
        yield WriteWait(sock)
        len = sock.send(buffer)
        buffer = buffer[len:]

def Recv(sock,maxbytes):
    yield ReadWait(sock)
    yield sock.recv(maxbytes)

# ------------------------------------------------------------
#                      === Example ===
# ------------------------------------------------------------

# Look at the echoserver.py script for an example

<a href="echoserver.py">echoserver.py</a>. A concurrent echo server using coroutines.
 .

In [None]:
# echoserver.py
#
# A concurrent echo server using coroutines

from pyos8 import *
from socket import *

def handle_client(client,addr):
    print("Connection from", addr)
    while True:
        data = yield Recv(client,65536)
        if not data:
            break
        yield Send(client,data)
    print("Client closed")
    client.close()


def server(port):
    print("Server starting")
    sock = socket(AF_INET,SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    sock.bind(("",port))
    sock.listen(5)
    while True:
        client,addr = yield Accept(sock)
        yield NewTask(handle_client(client,addr))

sched = Scheduler()
sched.new(server(45000))
sched.mainloop()

<a href="sockwrap.py">sockwrap.py</a>. An class wrapper that shows how you can emulate the
 socket interface with a collection of coroutine methods..

In [None]:
# sockwrap.py
#
# Coroutine wrapper around a socket object

from pyos8 import *

class Socket(object):
    def __init__(self,sock):
        self.sock = sock
    def accept(self):
        yield ReadWait(self.sock)
        client, addr = self.sock.accept()
        yield Socket(client), addr
    def send(self,buffer):
        while buffer:
            yield WriteWait(self.sock)
            len = self.sock.send(buffer)
            buffer = buffer[len:]
    def recv(self, maxbytes):
        yield ReadWait(self.sock)
        yield self.sock.recv(maxbytes)
    def close(self):
        yield self.sock.close()

<a href="echoserver2.py">echoserver2.py</a>. An echo server using the class-based interface..

In [None]:
# echoserver2.py
#
# A concurrent echo server using coroutines

from pyos8 import *
from socket import *
from sockwrap import Socket

def handle_client(client,addr):
    print("Connection from", addr)
    while True:
        data = yield client.recv(65536)
        if not data:
            break
        yield client.send(data)
    print("Client closed")
    yield client.close()

def server(port):
    print("Server starting")
    rawsock = socket(AF_INET,SOCK_STREAM)
    rawsock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    rawsock.bind(("",port))
    rawsock.listen(1024)

    sock = Socket(rawsock)
    while True:
        client,addr = yield sock.accept()
        yield NewTask(handle_client(client,addr))

sched = Scheduler()
sched.new(server(45000))
sched.mainloop()

<a href="http://twistedmatrix.com">Twisted</a> to run this.
 One reason why I have included this here is that the low-level I/O
 handling is very similar. 
 Specifically, in our "operating system", I/O events are tied into task
 scheduling.  In Twisted, I/O events are tied into event handlers
 (Reactors)..

In [None]:
#!/usr/bin/python
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
# See LICENSE for details.

#

from twisted.internet.protocol import Protocol, Factory
from twisted.internet import reactor

### Protocol Implementation

# This is just about the simplest possible protocol
class Echo(Protocol):
    def connectionMade(self):
        print("Got a connection")

    def dataReceived(self, data):
        """As soon as any data is received, write it back."""
        self.transport.write(data)


def main():
    f = Factory()
    f.protocol = Echo
    reactor.listenTCP(45000, f)
    reactor.run()

if __name__ == '__main__':
    main()

# Design Commentary

# Contact me