In [1]:
def say_hello(name):
    yield 'Hello ' + str(name)
    
assert say_hello('An').send(None) == 'Hello An', 'Should equal to "Hello An"'

def say_hello_twice(name):
    yield 'Hello ' + str(name)
    yield 'Hello ' + str(name)

In [2]:
class Task():
    
    _id = 1
    
    def __init__(self, coro):
        self._coro = coro
        self.id = Task._id
        self.send_val = None
        Task._id += 1
        
    def run(self):
        return self._coro.send(self.send_val)
    
first_task = Task(say_hello('An'))
assert first_task.id == 1, 'The first task should have id = 1'

second_task = Task(say_hello('An'))
assert second_task.id == 2, 'The second task should have id = 2'
assert second_task.run() == 'Hello An'

In [3]:
class SystemCall:
    
    def handle(self):
        raise NotImplementedError

In [4]:
class GetID(SystemCall):
    
    def handle(self, schd: 'Scheduler', current_task: Task):
        current_task.send_val = current_task.id
        
        schd.schedule(current_task)
        
        return current_task.id
    
class NewTask(SystemCall):
    
    def __init__(self, target):
        self.target = target
        
    def handle(self, schd: 'Scheduler', current_task: 'Task'):
        new_task_id = schd.new(self.target)
        
        current_task.send_val = new_task_id
        
        schd.schedule(current_task)
    
    
class KillTask(SystemCall):
    
    def __init__(self, tid):
        self.tid = tid
    
    def handle(self, schd: 'Scheduler', current_task: 'Task'):
        task = schd.ready_tasks.get(self.tid, None)
        if task:
            task._coro.close()
            current_task.send_val = True
        else:
            current_task.send_val = False
            
        schd.schedule(current_task)

In [6]:
from queue import Queue


class Scheduler:
    
    def __init__(self):
        self.queue = Queue()
        self.ready_tasks = {}
    
    def new(self, coro):
        new_task = Task(coro)
        self.ready_tasks[new_task.id] = new_task
        self.schedule(new_task)
        
        return new_task.id
    
    def schedule(self, task):
        self.queue.put(task)
        
    def exit(self, task):
        del self.ready_tasks[task.id]
        
    def run(self):
        i = 0
        while self.ready_tasks:
            i += 1
            # print('Running clock cycle #{}'.format(i))
            current_task = self.queue.get()

            try:
                result = current_task.run()
                # print('-------> Result is:', result)
                if isinstance(result, SystemCall):
                    print('!!!!! Got a system call here:', result)
                    result.handle(self, current_task)
                    continue
            except StopIteration:
                print('xxxxxxxxxxxxxxxxxx Ending task xxxxxxxxxxxxxxxxxx', current_task.id)
                self.exit(current_task)
            else:
                self.schedule(current_task)
            


In [7]:
def print_my_own_id():
    my_id = (yield GetID())
    yield 'My id is: ' + str(my_id)
    
def parent_task():
    tid = yield NewTask(say_hello_twice('Sad Child'))
    for i in range(5):
        print("I have a child:", tid)
    
    yield KillTask(tid)
        
def kind_parent_task():
    tid = yield NewTask(say_hello_twice('Lucky Child'))
    for i in range(5):
        print("I have a child:", tid)

In [8]:
schd = Scheduler()
schd.new(say_hello('An'))
schd.new(say_hello('Vien'))
schd.new(say_hello_twice('ANN'))
schd.new(print_my_own_id())
schd.new(kind_parent_task())
schd.new(parent_task())
schd.run()

!!!!! Got a system call here: <__main__.GetID object at 0x00000226076EE910>
!!!!! Got a system call here: <__main__.NewTask object at 0x00000226076EEE50>
!!!!! Got a system call here: <__main__.NewTask object at 0x00000226076EE160>
xxxxxxxxxxxxxxxxxx Ending task xxxxxxxxxxxxxxxxxx 3
xxxxxxxxxxxxxxxxxx Ending task xxxxxxxxxxxxxxxxxx 4
I have a child: 9
I have a child: 9
I have a child: 9
I have a child: 9
I have a child: 9
xxxxxxxxxxxxxxxxxx Ending task xxxxxxxxxxxxxxxxxx 7
I have a child: 10
I have a child: 10
I have a child: 10
I have a child: 10
I have a child: 10
!!!!! Got a system call here: <__main__.KillTask object at 0x00000226076FF790>
xxxxxxxxxxxxxxxxxx Ending task xxxxxxxxxxxxxxxxxx 5
xxxxxxxxxxxxxxxxxx Ending task xxxxxxxxxxxxxxxxxx 6
xxxxxxxxxxxxxxxxxx Ending task xxxxxxxxxxxxxxxxxx 10
xxxxxxxxxxxxxxxxxx Ending task xxxxxxxxxxxxxxxxxx 8
xxxxxxxxxxxxxxxxxx Ending task xxxxxxxxxxxxxxxxxx 9


In [9]:
import select

def ReadWait(sock):
    '''
    Check if a file descriptor (fd) is ready for reading/writing
    select.select(rlist, wlist, xlist[, timeout]) will accept 3 lists of fds:
        rlist: wait until this list is ready for reading
        wlist: wait until this list is ready for writing
        xlist: wait until an exceptional condition
        timeout: specify the time to wait for. if the timeout is omited, block and wait until at least one fd ready
                 if timeout = 0, represent a poll, no blocking
    return values are three lists of objects that are ready. They are subset of the rlist, wlist, and xlist.
    '''
    print('Waiting for data from:', sock)
    while True:
        r, w, _ = select.select([sock], [sock], [], 0)
        if not r:
            yield
        else:
            print('r:', r)
            print('w:', w)
            return
    
    

In [10]:
from socket import socket, AF_INET, SOCK_STREAM, SOCK_DGRAM


def handle_request(client, addr):
    print('Connection from ...', addr)
    command = ''
    while True:
        yield from ReadWait(client)
        data = client.recv(65536)
        print('Receiving data:', data)
        if not data or data == b'\x03':
            break
        
        if data == b'\r\n':
            print('Receiving command:', command)
            client.send(data)
            command = ''
        else:
            command += data.decode()
        
        print('Sending data:', data)
        client.send(b'')
    client.close()
    yield

    
def server(port=1234):
    print('Starting server at port:', port)
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(("", port))
    sock.listen(5)
    while True:
        print('Waiting for new connection')
        yield from ReadWait(sock)
        client, addr = sock.accept()
        yield NewTask(handle_request(client, addr))

In [11]:
schd = Scheduler()
schd.new(server())
schd.new(kind_parent_task())
schd.run()

Starting server at port: 1234
Waiting for new connection
Waiting for data from: <socket.socket fd=1160, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 1234)>
!!!!! Got a system call here: <__main__.NewTask object at 0x00000226075FED60>
I have a child: 13
I have a child: 13
I have a child: 13
I have a child: 13
I have a child: 13
xxxxxxxxxxxxxxxxxx Ending task xxxxxxxxxxxxxxxxxx 12
xxxxxxxxxxxxxxxxxx Ending task xxxxxxxxxxxxxxxxxx 13
r: [<socket.socket fd=1160, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 1234)>]
w: []
!!!!! Got a system call here: <__main__.NewTask object at 0x00000226075FEDC0>
Connection from ... ('127.0.0.1', 52575)
Waiting for data from: <socket.socket fd=1224, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), raddr=('127.0.0.1', 52575)>
Waiting for new connection
Waiting for data from: <socket.socket fd=1160, family=AddressFamily.AF_INET, typ

r: [<socket.socket fd=1224, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), raddr=('127.0.0.1', 52575)>]
w: [<socket.socket fd=1224, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), raddr=('127.0.0.1', 52575)>]
Receiving data: b's'
Sending data: b's'
Waiting for data from: <socket.socket fd=1224, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), raddr=('127.0.0.1', 52575)>
r: [<socket.socket fd=1224, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), raddr=('127.0.0.1', 52575)>]
w: [<socket.socket fd=1224, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), raddr=('127.0.0.1', 52575)>]
Receiving data: b'd'
Sending data: b'd'
Waiting for data from: <socket.socket fd=1224, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), 

KeyboardInterrupt: 

This is a simple echo server, running via tcp connection. What ever charater input by client, the server will echo back.
Overall process:
1. The server starts
2. Create a socket then listen from this socket.
3. server wait until a client connects. sock.accept() will block until a client connects. Return values of `accept()` are `client` and `addr`. `client` another socket object.
4. When a client connects, the server will create and schedule a task to handle input from the client.
5. The handling is simple: read the client's input `client.recv()` and send back the very same data `client.send()`
6. At first there are two places the system blocks:
 1. while waiting for client. `socket.accept()` will block
 2. while waiting for client's input. `client.recv()` will block
 
**Note**: These are typical IO operations
 
Thus, we __cannot__ serve more than 1 client at the time. Because the system will block at a certain point in time.

To solve this issue, we implement an asynchronous function to check for the readiness of socket data, either it's a new client connection or client's input. The check follow below steps:
1. Before attempt to read (and block) the client's intput or client connection
2. Check the fd with `select.select()` to know whether a socket is ready for reading/writing.
3. If nothing is ready, `yield` the control back to other processes.
4. If fd is ready, exit and return the control back to it's caller to proceed to read/accept client

This will help our echo server to serve multiple clients at once. just like a busy restaurant with only one waiter.

# Conclusion
Eventhough this is a simple server but it demontrates very well following concept:
By handling work asynchronously, we have leveraged CPU time during IO operation to perform multiple tasks.
We have to make sure that no task is blocking. Otherwise, the whole system will be block somewhere.
Processing is different from blocking.
A task is blocking is when it is waiting for some io operation but doesn't release the CPU.
If it using CPU to process the work, it's still blocking other tasks from running but this is acceptable behavior as long as it doesn't make other tasks wait too long (how long is too long depends on the system. An online game is more demanding for response time then a file uploading system.)


In [1]:
# import the socketserver module of Python

import socketserver

 

# Create a Request Handler

# In this TCP server case - the request handler is derived from StreamRequestHandler

class MyTCPRequestHandler(socketserver.StreamRequestHandler):

 

# handle() method will be called once per connection

    def handle(self):

        # Receive and print the data received from client

        print("Recieved one request from {}".format(self.client_address[0]))

        msg = self.rfile.readline().strip()

        print("Data Recieved from client is:".format(msg))

        print(msg)  

       

        # Send some data to client

        self.wfile.write("Hello Client....Got your message".encode())

            

# Create a TCP Server instance

aServer         = socketserver.TCPServer(("127.0.0.1", 9090), MyTCPRequestHandler)

 

# Listen for ever

aServer.serve_forever()

Recieved one request from 127.0.0.1
Data Recieved from client is:
b'asdf'


KeyboardInterrupt: 