`yield from`
===

In [5]:
def braced_chain(*gens, before='{', after='}'):
    for gen in gens:
        yield before
        for x in gen:
            yield x
        yield after

In [6]:
g2 = lambda: (x**2 for x in range(3))
g3 = lambda: (x**3 for x in range(3))
g4 = lambda: (x**4 for x in range(3))

In [9]:
list(braced_chain(g2(), g3(), g4(), before='<', after='>'))

['<', 0, 1, 4, '>', '<', 0, 1, 8, '>', '<', 0, 1, 16, '>']

In [8]:
def braced_chain(*gens, before='{', after='}'):
    for gen in gens:
        yield before
        yield from gen
        yield after

Корутины
===

In [10]:
class Mean:
    def __init__(self):
        self._sum = 0
        self._n = 0
        
    def add(self, x):
        self._sum += x
        self._n += 1
        
        return x >= self._sum / self._n

In [11]:
m = Mean()

In [12]:
m.add(1)

True

In [13]:
m.add(10)

True

In [14]:
m.add(2)

False

In [15]:
m.add(3)

False

In [16]:
m.add(4)

True

In [17]:
# будем посылать новые данные в x и получать result
def mean():
    result = None
    s = 0
    n = 0
    while True:
        x = yield result
        s += x
        n += 1
        result = x >= s / n

In [18]:
m = mean()

In [19]:
m

<generator object mean at 0x10ae84660>

In [20]:
next(m)

In [21]:
m.send(1)

True

In [22]:
m.send(10)

True

In [23]:
m.send(2)

False

In [24]:
m.send(3)

False

In [25]:
m.send(4)

True

In [26]:
def mean():
    result = None
    s = 0
    n = 0
    while True:
        x = yield result
        if x is None:
            break
        s += x
        n += 1
        result = x >= s / n

In [27]:
m = mean()

In [28]:
next(m)

In [29]:
m.send(1)

True

In [30]:
m.send(2)

True

In [31]:
m.send(None)

StopIteration: 

`RestoringGet`
===

In [None]:
logger = logging.getLogger(name)


class RestoringGet:
    MAX_RESTORES = 10

    def init(self, url, get_kwargs=None):
        if get_kwargs is None:
            get_kwargs = {}

        self._url = url
        self._kwargs = get_kwargs

    def get_generator(self):
        restores = 0
        offset = 0
        headers = {}
        while True:
            restores += 1
            if restores > self.MAX_RESTORES:
                raise TooManyRestores()

            response = requests.get(
                self._url, headers=headers, **self._kwargs)
            response.raise_for_status()

            real_length = yield response

            content_length = parse_int(
                response.headers.get('Content-Length'), None)
            if content_length is None \
                or content_length + offset <= real_length:
                break

            logger.info(
                'GET looks to be interrupted, trying to continue')
            offset = real_length
            headers = {'Range': 'bytes={}-'.format(offset)}

In [None]:
gen = RestoringGet(...).get_generator()
response = next(gen)
while response:
    file = save_response(response)
    try:
        response = gen.send(file.size)
    except StopIteration:
        response = None

`yield from` для корутин
===

In [32]:
def braced_chain(*gens, before='{', after='}'):
    for gen in gens:
        yield before
        for x in gen:
            yield x
        yield after

In [33]:
b = braced_chain(g2(), mean())

In [34]:
list(b)

['{', 0, 1, 4, '}', '{', None, '}']

In [36]:
def braced_chain(*gens, before='{', after='}'):
    for gen in gens:
        yield before
        yield from gen
        yield after

In [37]:
m = mean()
b = braced_chain(g2(), m)
(
    next(b), next(b), next(b), next(b), next(b),
    next(b), next(b), b.send(1), b.send(2), b.send(None)
)

('{', 0, 1, 4, '}', '{', None, True, True, '}')

In [None]:
next(b)
next(b)
next(b)
next(b)
next(b),
    next(b), next(b), b.send(1), b.send(2), b.send(None)


In [38]:
b.send(13)

StopIteration: 

`return` в генераторе
===

In [39]:
def mean():
    result = None
    s = 0
    n = 0
    while True:
        x = yield result
        if x is None:
            break
        s += x
        n += 1
        result = x >= s / n
    return s / n

In [45]:
try:
    m = mean()
    next(m), m.send(1), m.send(2), m.send(None)
except Exception as e:
    print(float(e))

TypeError: float() argument must be a string or a number, not 'StopIteration'

In [47]:
def braced_chain(*gens, before='{', after='}'):
    results = []
    for gen in gens:
        yield before
        result = yield from gen
        results.append(result)
        yield after
    return results # показываем что накопил yield from

In [48]:
m = mean()
b = braced_chain(g2(), m)
(
    next(b), next(b), next(b), next(b), next(b),
    next(b), next(b), b.send(1), b.send(2), b.send(None)
)

('{', 0, 1, 4, '}', '{', None, True, True, '}')

In [49]:
next(b)

StopIteration: [None, 1.5]

Communicate
===

In [50]:
import subprocess

In [51]:
p = subprocess.run(['python', '-c', 'open("/tmp/5555", "w").close()'])

In [52]:
p = subprocess.run(
    ['perl', '-E', 'say "out"; warn "err\n"'],
    stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

In [53]:
p.stdout, p.stderr

(b'out\n', b'err\n')

In [55]:
from subprocess import Popen

In [59]:
p = Popen(
    ['perl', '-E', 'say "out"; warn "err\n"'],
    stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

In [60]:
p.communicate()

(b'out\n', b'err\n')

In [61]:
while True:
    out = p.stdout.readline()
    err = p.stderr.readline()
    if out:
        print(f'out: {out}')
    if err:
        print(f'err: {err}')
    if not out and not err:
        break       

ValueError: readline of closed file

In [None]:
# А что если заблочимся?

In [None]:
p = Popen(
    ['perl', '-E', 'warn "err\n" x 1000000; say "out";'],
    stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

Threads
===

In [62]:
import threading
threads_num = 4

p = Popen(
    ['perl', '-E', 'warn "err\n"; say "out";'],
    stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

def log(fh, prefix):
    while True:
        line = fh.readline()
        if line:
            print('{}: {}'.format(prefix, line.decode('utf8')), end='')
        else:
            break

threads = [
    threading.Thread(target=log, args=(p.stdout, 'out')),
    threading.Thread(target=log, args=(p.stderr, 'err')),
]
for t in threads:
    t.start()
for t in threads:
    t.join()

err: err
out: out


Selector
===

In [63]:
import selectors

sel = selectors.DefaultSelector() # дай самый эффективный селектор

In [64]:
sel

<selectors.KqueueSelector at 0x10afd8a00>

In [65]:
p = Popen(
    ['perl', '-E', 'warn "err\n"; say "out"; warn "2\n"'],
    stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
sel.register(p.stdout, selectors.EVENT_READ, lambda l: print('out: {}'.format(l)))
sel.register(p.stderr, selectors.EVENT_READ, lambda l: print('err: {}'.format(l)))

SelectorKey(fileobj=<_io.BufferedReader name=81>, fd=81, events=1, data=<function <lambda> at 0x10b318430>)

In [66]:
while True:
    events = sel.select()
    if not events:
        break
    for key, mask in events:
        callback = key.data
        line = key.fileobj.readline()
        if line:
            callback(line)
        else:
            sel.unregister(key.fileobj)

err: b'err\n'
out: b'out\n'
err: b'2\n'


KeyboardInterrupt: 

`asyncio`
===

In [None]:
# на самом деле питон в момент времени выполняет одну асинхронную функцию

In [74]:
import asyncio
loop = asyncio.get_event_loop() # всегда один

In [68]:
p = asyncio.create_subprocess_exec(
    'perl', '-E', 'warn "err\n"; say "out"; warn "2\n"',
    stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)

In [69]:
p

<coroutine object create_subprocess_exec at 0x10b430740>

In [70]:
next(p)

TypeError: 'coroutine' object is not an iterator

In [71]:
@asyncio.coroutine
def execute():
    p = yield from asyncio.create_subprocess_exec(
        'perl', '-E', 'warn "err\n"; say "out"; warn "2\n"',
        stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
    )
    print('>>> {}'.format(p))

  def execute():


In [72]:
execute()

<generator object execute at 0x10b3abd60>

In [75]:
loop.run_until_complete(execute())

RuntimeError: This event loop is already running

In [76]:
@asyncio.coroutine
def log(fh, prefix):
    while True:
        line = yield from fh.readline()  ###
        if line:
            print('{}: {}'.format(prefix, line.decode('utf8')), end='')
        else:
            break

  def log(fh, prefix):


In [77]:
@asyncio.coroutine
def execute(loop):
    p = yield from asyncio.create_subprocess_exec(
        'perl', '-E', 'warn "err\n"; say "out"; warn "2\n"',
        stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
    )
    
    loop.create_task(log(p.stdout, 'stdout'))
    loop.create_task(log(p.stderr, 'stderr'))
    
    yield from p.wait()

  def execute(loop):


In [78]:
loop.run_until_complete(execute(loop))

RuntimeError: This event loop is already running

`async` / `await`
===

In [81]:
# await - отпускаем управление

async def x():
    await x()

In [82]:
async def log(fh, prefix):
    while True:
        line = await fh.readline()
        if line:
            print('{}: {}'.format(prefix, line))
        else:
            break

In [83]:
async def execute(loop):
    p = await asyncio.create_subprocess_exec(
        'perl', '-E', 'warn "err\n"; say "out"; warn "2\n"',
        stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
    )
    
    loop.create_task(log(p.stdout, 'stdout'))
    loop.create_task(log(p.stderr, 'stderr'))
    
    await p.wait()

In [84]:
loop.run_until_complete(execute(loop))

RuntimeError: This event loop is already running

`aiohttp`
===

In [85]:
import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        if response.status == 200:
            return await response.text()  # return yield from
        else:
            return f'ERROR: {response.status}'

async def download_wiki(article):
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://de.wikipedia.org/{}'.format(article))
        return html[:15]

# в одном треде ждем много вещей
loop = asyncio.get_event_loop()
tasks = asyncio.gather(
    download_wiki('wiki/Évariste_Galois'),
    download_wiki('wiki/Alan_Turing'),
    download_wiki('zzz'),
)

In [86]:
tasks

<_GatheringFuture finished result=['<!DOCTYPE html>', '<!DOCTYPE html>', 'ERROR: 404']>

In [87]:
loop.run_until_complete(tasks)

RuntimeError: This event loop is already running

Сервер
===

In [None]:
from aiohttp import web