In [2]:
import time
from collections import deque

class Scheduler:
    
    def __init__(self):
        self.ready = deque()
        
    def run(self):
        while self.ready:
            func = self.ready.popleft()
            func()

def countdown(n):
    while n > 0:
        print('Down', n)
        time.sleep(1)
        n -=1

def countup():
    x=0
    while x < stop:
        print('Up', x)
        time.sleep(1);
        x +=1

In [3]:
# Sequential
countdown(5)
countup(5)

Down 5
Down 4
Down 3
Down 2
Down 1
Up 0
Up 1
Up 2
Up 3
Up 4


In [5]:
# Problem: How to make concurrent?
import threading

threading.Thread(target=countdown, args=(5,)).start()
threading.Thread(target=countup, args=(5,)).start()

Down 5
Up 0
Down 4
Up 1
Down 3
Up 2
Down 2
Up 3
Down 1
Up 4


In [8]:
# Problem: How to make concurrent? But NO THREADS
# Issue: How would you switch between functions?


from collections import deque

import time


class Scheduler:

    def __init__(self):
        self.ready = deque()

    def run(self):
        while self.ready:
            func = self.ready.popleft()
            func()

    def call_soon(self, func):
        self.ready.append(func)


sched = Scheduler()


def countdown(n):
    if n > 0:
        print('Down', n)
        time.sleep(1)
        sched.call_soon(lambda: countdown(n - 1))


def countup(stop, x = 0):
    if x < stop:
        print('Up', x)
        time.sleep(1)
        sched.call_soon(lambda: countup(stop, x+1))


sched.call_soon(lambda: countdown(5))
sched.call_soon(lambda: countup(5))
sched.run()


Down 5
Up 0
Down 4
Up 1
Down 3
Up 2
Down 2
Up 3
Down 1
Up 4


In [9]:
import heapq
from collections import deque

import time


class Scheduler:

    def __init__(self):
        self.ready = deque()
        self.sleeping = []

    def run(self):
        while self.ready or self.sleeping:
            # Do something if empty
            if not self.ready:
                deadline, func = self.sleeping.pop(0)
                delta = deadline - time.time()
                if delta < 0:
                    delta = 0
                time.sleep(deadline)
            func = self.ready.popleft()
            func()

    def call_soon(self, func):
        self.ready.append(func)

    def call_later(self, delay, func):
        deadline = time.time() + delay
        heapq.heqppush(self.sleeping.append((deadline, func))
       # self.sleeping.sort()

sched = Scheduler()


def countdown(n):
    if n > 0:
        print('Down', n)
       # time.sleep(1)
        sched.call_soon(2, lambda: countdown(n - 1))


def countup(stop, x = 0):
    if x < stop:
        print('Up', x)
     #   time.sleep(1)
        sched.call_soon(1, lambda: countup(stop, x+1))


sched.call_soon(lambda: countdown(5))
sched.call_soon(lambda: countup(5))
sched.run()


SyntaxError: invalid syntax (<ipython-input-9-7b7c0020c9f6>, line 33)

In [10]:
import time
import queue
import threading

def producer(queue, count):
    for n in range(count):
        print('Producing', n)
        queue.put(n)
        time.sleep(1)
    queue.put(None)

def consumer(queue):
    while True:
        item = queue.get()
        if item is None: break
        print('Consumed', item)

q = queue.Queue()
threading.Thread(target=producer, args=(q, 5)).start()
threading.Thread(target=consumer, args=(q, 5)).start()


Producing 0


Exception in thread Thread-11:
Traceback (most recent call last):
  File "C:\Users\gopir\Anaconda3\lib\threading.py", line 917, in _bootstrap_inner
    self.run()
  File "C:\Users\gopir\Anaconda3\lib\threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
TypeError: consumer() takes 1 positional argument but 2 were given



Producing 1
Producing 2
Producing 3
Producing 4


In [13]:
# Yeild:-

def countdown(n):
    while n > 0:
        print('Down', n)
        time.sleep(1)
        yield
        n -=1

def countup(stop):
    x=0
    while x < stop:
        print('Up', x)
        time.sleep(1);
        yield
        x +=1

In [17]:
from collections import deque

import time


class Scheduler:
    
    def __init__(self):
        self.ready = deque()
        
    def new_task(self, gen):
        self.ready.append(gen)
        
        
    def run(self):
        while self.ready:
            gen = self.ready.popleft()
            try:
                next(gen)   #'iteration'
                self.ready.append(gen)
            except StopIteration:
                pass
            
def countdown(n):
    while n > 0:
        print('Down', n)
        time.sleep(1)
        yield                    #
        n -=1

def countup(stop):
    x=0
    while x < stop:
        print('Up', x)
        time.sleep(1);
        yield
        x +=1
        
sched = Scheduler()

sched.new_task(countdown(5))
sched.new_task(countup(5))
sched.run()

Down 5
Up 0
Down 4
Up 1
Down 3
Up 2
Down 2
Up 3
Down 1
Up 4


In [18]:
from collections import deque

import time


class Scheduler:
    
    def __init__(self):
        self.ready = deque()
        
    def new_task(self, gen):
        self.ready.append(gen)
        
        
    def run(self):
        while self.ready:
            coro = self.ready.popleft()
            try:
                coro.send(None)         # Drive a coroutine
                self.ready.append(coro)
            except StopIteration:
                pass


class Awaitable:
    
    def __await__(self):
        yield
        
def switch():
    return Awaitable()

async def countdown(n):
    while n > 0:
        print('Down', n)
        time.sleep(1)
        await switch()
        yield                    #
        n -=1

async def countup(stop):
    x=0
    while x < stop:
        print('Up', x)
        await switch()
        time.sleep(1);        
        x +=1 
        

sched = Scheduler()

sched.new_task(countdown(5))
sched.new_task(countup(5))
sched.run()

AttributeError: 'async_generator' object has no attribute 'send'

In [19]:
from collections import deque
import heapq
import time


class Scheduler:
    
    def __init__(self):
        self.ready = deque()
        se
        
    def new_task(self, gen):
        self.ready.append(gen)
    
        
    def run(self):
        while self.ready:
            coro = self.ready.popleft()
            try:
                coro.send(None)         # Drive a coroutine
                self.ready.append(coro)
            except StopIteration:
                pass


class Awaitable:
    
    def __await__(self):
        yield
        
def switch():
    return Awaitable()

async def countdown(n):
    while n > 0:
        print('Down', n)
        time.sleep(1)
        await switch()
        yield                    #
        n -=1

async def countup(stop):
    x=0
    while x < stop:
        print('Up', x)
        await switch()
        time.sleep(1);        
        x +=1 
        

sched = Scheduler()

sched.new_task(countdown(5))
sched.new_task(countup(5))
sched.run()

NameError: name 'se' is not defined

In [20]:
# producer-consumer
# Challenge: Build an async queue

import time
from collections import deque
import heapq

class Scheduler:
    def __init__(self):
        self.ready = deque()
        self.sleeping = [ ] 
        self.sequence = 0

    def call_later(self, delay, func):
        deadline = time.time() + delay
        self.sequence += 1
        heapq.heappush(self.sleeping, 
                       (deadline, self.sequence, func))

    def run(self):
        while self.ready or self.sleeping:
            # Do something if empty
            if not self.ready:
                deadline, _, func = heapq.heappop(self.sleeping)
                delta = deadline - time.time()
                if delta < 0:
                    delta = 0
                time.sleep(delta)
                self.ready.append(func)
            func = self.ready.popleft()
            func()

    def call_soon(self, func):
        self.ready.append(func)
        
sched = Scheduler()

class AsyncQueue:
    def __init__(self):
        self.items = deque()
        self.waiting = deque() 
    def put(self, item):
        self.items.append(item)
        if self.waiting:
            sched.call_soon(self.waiting.popleft())

    def get(self, callback):
        if not self.items:
            self.waiting.append(
                lambda: self.get(callback)
                )
        else:
            callback(self.items.popleft())

def producer(queue, count, n=0):
    if n < count:
        print('Producing', n)
        queue.put(n)
        sched.call_later(1, 
                         lambda : producer(queue, count, n+1))
    else:
        queue.put(None)

def consumer(queue):
    def got_it(item):
        if item is not None:
            print('Consumed', item)
            sched.call_later(3, lambda: consumer(queue))
    queue.get(callback=got_it)  # 

q = AsyncQueue()
sched.call_soon(lambda: producer(q, 10))
sched.call_soon(lambda: consumer(q))
sched.run()


Producing 0
Consumed 0
Producing 1
Producing 2
Consumed 1
Producing 3
Producing 4
Producing 5
Consumed 2
Producing 6
Producing 7
Producing 8
Consumed 3
Producing 9
Consumed 4
Consumed 5
Consumed 6
Consumed 7
Consumed 8
Consumed 9


In [None]:
a + b
a__add__(b)