<center>
    <img src="https://upload.wikimedia.org/wikipedia/commons/a/a8/%D0%9B%D0%9E%D0%93%D0%9E_%D0%A8%D0%90%D0%94.png" width=500px/>
    <font>Python 2024</font><br/>
    <br/>
    <br/>
    <b style="font-size: 2em">Итераторы и генераторы</b><br/>
    <br/>
    <font>Ярослав Золотарёв, по материалам Вадима Мазаева🫡</font><br/>
</center>

## План лекции

* Итераторы (45 min)
* Генераторы как итераторы (25 min)
* Генераторы как корутины (40 min)

# Итераторы

In [None]:
!touch file.txt

for i in range(5):
    pass
    
for line in open('file.txt'):
    pass

for key in {'A' : 1, 'B' : 2, 'C' : 3}:
    pass
    
for letter in 'Hello, World':
    pass

In [None]:
iterable = [1, 2, 3]
iterator = iterable.__iter__()
iterator

In [None]:
iterator.__next__()

In [None]:
iterator.__next__()

In [None]:
iterator.__next__()

In [None]:
iterator.__next__()

### [Iterable](https://docs.python.org/3/glossary.html#term-iterator)

Это объект, у которого определён метод `__iter__`,

возвращающий **итератор**.

Примеры: `list`, `dict`, `range`

### [Iterator](https://docs.python.org/3/glossary.html#term-iterator)

Это объект, у которого определены методы `__iter__` и `__next__`.

Метод `__iter__` должен возвращать сам итератор (`self`).

Метод `__next__` должен возвращать следующий элемент,

а если их не осталось, выкидывать исключение  `StopIteration`.

## [iter](https://docs.python.org/3/library/functions.html#iter) & [next](https://docs.python.org/3/library/functions.html#next)

In [None]:
iterator = iter([1])  # aka __iter__
iterator

In [None]:
next(iterator)

In [None]:
next(iterator)

In [None]:
next(iterator, 'some value')  # dct.get(key, 'some value')

## Вторая форма оператора iter  <img src="smugjack.jpg" alt="drawing" width="20" style="display:inline; vertical-align:-1px; margin-right:5px; margin-left:5px;"/>

In [None]:
import io

stream = io.StringIO('abcdefghi')

def read3() -> str:
    return stream.read(3)

In [None]:
iter(read3, '')  # every __next__ translates to __call__

In [None]:
for idx, chunk in enumerate(iter(read3, '')):  # iter(callable, sentinel)
    print(chunk, end=' ')
    if idx > 100000:  # Playground
        break

In [None]:
from collections.abc import Callable

def make_timer(ticks: int) -> Callable[[], int]:
    
    def _timer() -> int:
        nonlocal ticks
        ticks -= 1
        return ticks

    return _timer

In [None]:
timer = make_timer(2)

In [None]:
timer()

In [None]:
timer()

In [None]:
for i in iter(make_timer(10), -1):
    print(i, end=' ')

## Реализация цикла for через while

In [None]:
for value in sequence:
    ...

In [None]:
iterator = iter(sequence)
while True:
    try:
        value = next(iterator)
    except StopIteration:
        break
    else:
        ...

## О хранении итератором состояния

In [None]:
iterable = range(10)
iterable

In [None]:
iterator = iter(iterable)
iterator

In [None]:
zip_iterator = zip(iterator, iterator)
zip_iterator

In [None]:
for pair in zip_iterator:
    print(pair, end=' ')

## Объект и его итератор

In [None]:
iterable = open('IteratorsGenerators.ipynb')
iterable

In [None]:
iter(iterable)

In [None]:
iterable is iter(iterable)

In [None]:
class TextIOWrapper:
    ...
    
    def __iter__(self) -> 'TextIOWrapper':
        return self

    ...

In [None]:
class FilelikeRange:
    def __init__(self, start: int, stop: int) -> None:
        self._index = start
        self._stop = stop

    def __iter__(self) -> 'FilelikeRange':
        return self

    def __next__(self) -> int:
        if self._index >= self._stop:
            raise StopIteration()
        value = self._index
        self._index += 1
        return value

In [None]:
for i in FilelikeRange(0, 5):
    print(i, end=' ')

In [None]:
iterable = [1, 2, 3, 4, 5]
iterable

In [None]:
iter(iterable)

In [None]:
iterable is iter(iterable)

In [None]:
class ListlikeRange:
    class Iterator:
        def __init__(self, start: int, stop: int) -> None:
            self._index = start
            self._stop = stop
            
        def __iter__(self) -> 'Iterator':
            return

        def __next__(self) -> int:
            if self._index >= self._stop:
                raise StopIteration()
            value = self._index
            self._index += 1
            return value

    def __init__(self, start: int, stop: int) -> None:
        self._start = start
        self._stop = stop

    def __iter__(self) -> Iterator:
        return self.Iterator(self._start, self._stop)

In [None]:
for i in ListlikeRange(0, 5):
    print(i, end=' ') 

## Истощаемость

In [None]:
filelike_range = FilelikeRange(1, 5)

for elem in filelike_range:
    print(elem, end=' ')
    
for elem in filelike_range:
    print(elem, end=' ')

In [None]:
listlike_range = ListlikeRange(1, 5)

for elem in listlike_range:
    print(elem, end=' ')
    
for elem in listlike_range:
    print(elem, end=' ')

In [None]:
print((iter(listlike_range)), iter(listlike_range))

## [Sequence](https://docs.python.org/3/glossary.html#term-sequence) как iterable

In [None]:
from typing import TypeVar

T = TypeVar('T')

class Sequence:
    def __init__(self, *args: T) -> None:
        self._args = args
        
    def __len__(self) -> int:
        return len(self._args)

    def __getitem__(self, index: int) -> T:  # legacy
        if index < 0 or index >= len(self):
            raise IndexError(index)  # expected by for to detect eos
        return self._args[index]

In [None]:
seq = Sequence(1, 2, 3, 4, 5)
seq[0], seq[2], seq[4]

In [None]:
for i in seq:
    print(i, end=' ')

## [\_\_contains__](https://docs.python.org/3/reference/datamodel.html#object.__contains__)

In [None]:
3 in range(5)

In [None]:
from typing import Any

# https://docs.python.org/3.12/reference/expressions.html#membership-test-details
# default __contains__ looks like
def __contains__(self, value: Any) -> bool:
    for item in self:
        if item is value or item == value:
            return True
    return False

In [None]:
class MyRange:
    def __contains__(self, value: int) -> bool:
        return 0 <= value < self._stop
    
    ...

In [None]:
seq = Sequence(2, 3, 5, 8, 13, 21)

In [None]:
for i in seq:
    print(i, end=' ')

In [None]:
8 in seq  # object has no __contains__, so "in" uses iteration over __getitem__

## Некоторые функции для работы с итераторами

### [enumerate](https://docs.python.org/3/library/functions.html#enumerate)

In [None]:
# do NOT use range(len(...))
for i, char in enumerate('sample'):  # enumerate(iterable, start=index)
    print(i, char)  

### [zip](https://docs.python.org/3/library/functions.html#zip)

In [None]:
for left, right in zip('ABCD', 'xy'):
    print(left + right)

In [None]:
for left, right in zip('ABCD', 'xy', strict=True):
    print(left + right)

In [None]:
from itertools import zip_longest

for left, right in zip_longest('ABCD', 'xy', fillvalue='-'):
    print(left + right)

### Обращаем zip

In [None]:
x = [1, 2, 3]
y = [4, 5, 6]
zipped = zip(x, y)

In [None]:
for item in zipped:
    print(item, end=' ')

In [None]:
zipped = zip(x, y)

In [None]:
x2, y2 = zip(*zipped)
print(x2, y2)

### [map](https://docs.python.org/3/library/functions.html#map) & [filter](https://docs.python.org/3/library/functions.html#filter)

In [None]:
for squared in map(lambda x: x ** 2, range(5)):
    print(squared, end=' ')

In [None]:
for filtered in filter(lambda x: x % 2 == 0, range(10)):
    print(filtered, end=' ')

## [itertools](https://docs.python.org/3/library/itertools.html) & [more](https://more-itertools.readthedocs.io/en/stable/)

### [itertools.chain](https://docs.python.org/3/library/itertools.html#itertools.chain)

In [None]:
from itertools import chain

In [None]:
for elem in (range(5) + [10, 20] + 'sample' + [[i] for i in range(5)]):
    print(elem, end=' ')

In [None]:
for elem in chain(range(5), [10, 20], 'sample', [[i] for i in range(5)]):
    print(elem, end=' ')

In [None]:
from typing import Any

def repeat(times: int, obj: Any) -> list[Any]:
    return [obj] * times

In [None]:
list(repeat(5, [1, 2, 3]))

In [None]:
list(chain.from_iterable(repeat(5, [[1], 2, 3])))  # снимаем 1 уровень вложенности

### Как проверить объект на итерируемость (с гарантией)

In [None]:
try:
    iter(object_to_test)
except TypeError:
    # not an iterable
    ...
else:
    # iterable
    ...

### [itertools.tee](https://docs.python.org/3/library/itertools.html#itertools.tee)

In [None]:
from itertools import tee

![tee](http://4.bp.blogspot.com/-u_KYBwIUyF4/UUR5cvbv6PI/AAAAAAAAAXs/hPJT0ZR5iBc/s1600/tee_diagram.png)

In [None]:
iterator1, iterator2 = tee(range(3), 2)

In [None]:
for elem in iterator1:
    print(elem, end=' ')

for elem in iterator2:
    print(elem, end=' ')

### [itertools.groupby](https://docs.python.org/3/library/itertools.html#itertools.groupby)

In [None]:
from itertools import groupby

In [None]:
for key, group in groupby('AABBCCDAAB'): 
    print(key, list(group))

In [None]:
words = ['cab', 'face', 'cafe', 'abc', 'goon']
words = sorted(words, key=sorted)
words

In [None]:
for key, group in groupby(words, key=sorted): 
    print(','.join(key), list(group))

# Генераторы

In [None]:
from collections.abc import Iterator

def countdown(n: int) -> Iterator[int]:
    print(f'Counting down from {n}')
    for i in range(n, 0, -1):
        # Can do anything here
        yield i
    print('Done')

In [None]:
for i in countdown(5):
    print(i)

In [None]:
countdown

In [None]:
counter = countdown(10)  # you don't immediately get the result
counter

In [None]:
iter(counter) is counter

In [None]:
counter = countdown(2)

In [None]:
next(counter)

In [None]:
next(counter)

In [None]:
next(counter)

### [Generator](https://docs.python.org/3/glossary.html#term-generator)

Это специальный итератор, который получается в результате вызова функции, содержащей ключевое слово `yield`.

Последовательность значений, которую возвращает генератор, задается последовательностью операторов `yield` в теле функции.

### Примеры

In [None]:
def squares(size: int) -> Iterator[int]:
    for i in range(size):
        yield i ** 2

In [None]:
generator = squares(5)

In [None]:
# from dis import dis
# dis(generator.gi_frame.f_code)

In [None]:
generator = squares(5)

for elem in generator:
    print(elem, end=' ')
    assert generator.gi_frame.f_lineno == 3

In [None]:
for elem in generator:  # Impossible to re-start generator; just make a new one if needed
    print(elem, end=' ')
    
assert generator.gi_frame is None

Генераторы истощаются!

In [None]:
from collections.abc import Iterable, Iterator
from typing import TypeVar

T = TypeVar('T')

def unique_ordered(elements: Iterable[T]) -> Iterator[T]:
    seen = set()
    for elem in elements:
        if elem in seen:
            continue
        seen.add(elem)
        yield elem

In [None]:
for elem in unique_ordered([1, 2, 3, 1, 2, 4]):
    print(elem, end=' ')

### Цепочка генераторов

In [None]:
def sum_of_squares_of_even(iterable: Iterable[int]) -> int:
    sum_ = 0
    for i in iterable:
        if i % 2 != 0:
            continue
        sum_ += i ** 2
    return sum_

In [None]:
sum_of_squares_of_even(range(10))

In [None]:
def even(iterable: Iterable[int]) -> list[int]:
    result = []
    for i in iterable:
        if i % 2 != 0:
            continue
        result.append(i)
    return result

In [None]:
def squares(iterable: Iterable[int]) -> list[int]:
    result = []
    for i in iterable:
        result.append(i ** 2)
    return result

In [None]:
sum(squares(even(range(10))))  # max: 10 ints

In [None]:
def even(iterable: Iterable[int]) -> Iterator[int]:
    for elem in iterable:
        if elem % 2 == 0:
            yield elem

In [None]:
def squares(iterable: Iterable[int]) -> Iterator[int]:
    for elem in iterable:
        yield elem ** 2

In [None]:
sum(squares(even(range(10))))  # max: 2 ints

Цепочка генераторов позволяет легко декомпозировать алгоритм без существенных затрат памяти.

### Генераторные выражения ([generator expression](https://docs.python.org/3/glossary.html#term-generator-expression))

In [None]:
squares = (x ** 2 for x in range(5))
squares

In [None]:
for square in squares:
    print(square, end=' ')

In [None]:
%%python3

import resource
import time
t0 = time.perf_counter()
print(max(x for x in range(1_000_000_000) if x % 11 == 0))
print(f"{resource.getrusage(resource.RUSAGE_SELF).ru_maxrss // 1024} MB RAM, {time.perf_counter() - t0} secs")

In [None]:
import sys

int_size_bytes = sys.getsizeof(0)
int_count = 1_000_000_000 / 11
list_size_bytes = int_size_bytes * int_count
list_size_megabytes = list_size_bytes / (1024 ** 2)
list_size_megabytes

In [None]:
%%python3

import resource
import time
t0 = time.perf_counter()
print(max([x for x in range(1_000_000_000) if x % 11 == 0]))
print(f"{resource.getrusage(resource.RUSAGE_SELF).ru_maxrss // 1024} MB RAM, {time.perf_counter() - t0} secs")

In [None]:
# listcomp > list(genexpr) bc of resizes
# sum(genexpr) > sum(listcomp)

### Генераторы в качестве итераторов

In [None]:
from collections.abc import Iterator
from dataclasses import dataclass

@dataclass
class BinaryTreeNode:
    value: int
    left: 'BinaryTreeNode | None' = None
    right: 'BinaryTreeNode | None' = None

    def __iter__(self) -> Iterator[int]:  # in-order
        for value in (self.left or ()):
            yield value

        yield self.value

        for value in (self.right or ()):
            yield value

In [None]:
tree = BinaryTreeNode(
    left=BinaryTreeNode(
        left=BinaryTreeNode(value=1),
        value=2,
    ),
    value=3,
    right=BinaryTreeNode(
        value=4,
        right=BinaryTreeNode(value=5),
    ),
)

In [None]:
for value in tree:
    print(value, end=' ')

<img src="https://i.imgur.com/mM6OdDr.png" alter="agents-of-yield" width=900/>

## Генераторы: продвинутое использование

#### Inspired by: http://dabeaz.com/finalgenerator/

In [None]:
from collections.abc import Iterator, Generator

In [None]:
def create_generator() -> Iterator[int]:
    yield 5

In [None]:
def create_generator() -> Generator[int, None, None]:
    yield 5

In [None]:
def create_duplicator() -> Generator[int, int, None]:
    print('Give me a value, please')
    value = yield  # ???
    print(f'Got value: {value}')
    yield value * 2
    print('Already finished')

In [None]:
dublicator = create_duplicator()
next(dublicator)  # send(None)

In [None]:
dublicator.send(21)

In [None]:
dublicator.send(100500)

### [yield как выражение](https://docs.python.org/3/reference/simple_stmts.html#yield)

![yield-expr](https://i0.wp.com/storage.googleapis.com/ssivart/super9-blog/priming-generator.png?w=1200&ssl=1)

In [None]:
def jumping_counter(upto: int) -> Generator[int, int, None]:
    count = 1
    while count <= upto:
        jump = yield count
        count += jump or 1

In [None]:
generator = jumping_counter(3)

In [None]:
next(generator)  # equals to .send(None)

In [None]:
generator.send(2)

In [None]:
generator.send(-1)

In [None]:
next(generator)

In [None]:
next(generator)

### [throw](https://docs.python.org/3/reference/expressions.html#generator.throw)

In [None]:
generator = jumping_counter(5)

In [None]:
next(generator)

In [None]:
generator.throw(RuntimeError('алё)'))

### [close](https://docs.python.org/3/reference/expressions.html#generator.close)

In [None]:
generator = jumping_counter(5)

In [None]:
next(generator)

In [None]:
generator.close()

In [None]:
next(generator)

### Обработка close

In [None]:
def create_generator() -> Iterator[int]:
    while True:
        try:
            yield 59
        except GeneratorExit:  # can't be ignored
            pass # but let's ignore it and see what happens
#             print('Exiting...')
#             return

In [None]:
generator = create_generator()

In [None]:
next(generator)

In [None]:
generator.close()

### [@contextmanager](https://docs.python.org/3/library/contextlib.html#contextlib.contextmanager)

In [None]:
from contextlib import contextmanager
import tempfile
import shutil

In [None]:
@contextmanager
def tempdir():
    dirname = tempfile.mkdtemp()
    try:
        yield dirname
    finally:
        shutil.rmtree(dirname)

In [None]:
with tempdir() as path:
    print(path)

In [None]:
# More precise implementation:
# https://github.com/python/cpython/blob/b3f0ceae919c1627094ff628c87184684a5cedd6/Lib/contextlib.py#L123

class _GeneratorContextManager:
    def __init__(self, func, args, kwargs):
        self.gen = func(*args, **kwargs)
    
    def __enter__(self):
        return next(self.gen)

    def __exit__(self, exc_type, exc_value, exc_traceback):
        if exc_type is None:
            try:
                next(self.gen)
            except StopIteration:
                return False
            raise RuntimeError("generator didn't stop")
        else:
            try:
                self.gen.throw(exc_type, exc_value, exc_traceback)
            except BaseException:
                return False
            raise RuntimeError("generator didn't stop after throw()")

def contextmanager(func):
    def helper(*args, **kwargs):
        return _GeneratorContextManager(func, args, kwargs)
    return helper

### [yield from](https://peps.python.org/pep-0380/)

In [None]:
from collections.abc import Iterable
from typing import TypeVar

T = TypeVar('T')

def repeat(times: int, iterable: Iterable[T]) -> T:
    for _ in range(times):
        yield from iterable  # https://www.python.org/dev/peps/pep-0380/

In [None]:
for elem in repeat(5, [1, 2, 3]):
    print(elem, end=' ')

In [None]:
def repeat(times: int, iterable: T) -> T:
    for _ in range(times):
        yield iterable

In [None]:
for elem in repeat(5, [1, 2, 3]):
    print(elem, end=' ')

In [None]:
from collections.abc import Iterator
from dataclasses import dataclass

@dataclass
class BinaryTreeNode:
    value: int
    left: 'BinaryTreeNode | None' = None
    right: 'BinaryTreeNode | None' = None

    def __iter__(self) -> Iterator[int]:
        yield from self.left or ()
        yield self.value
        yield from self.right or ()

In [None]:
tree = BinaryTreeNode(
    left=BinaryTreeNode(
        left=BinaryTreeNode(value=1),
        value=2,
    ),
    value=3,
    right=BinaryTreeNode(
        value=4,
        right=BinaryTreeNode(value=5),
    ),
)

In [None]:
for value in tree:
    print(value, end=' ')

### [return в генераторах](https://peps.python.org/pep-0255/#specification-return)

In [None]:
def create_generator() -> Generator[int, None, str]:
    print('Generator initialized')
    yield 52
    print('Generator has yielded')
    return "Saint-Petersburg"

In [None]:
generator = create_generator()
next(generator)
next(generator)

In [None]:
def generator_wrapper():
    result = yield from create_generator()
    print(result)

In [None]:
list(generator_wrapper())

## Генераторы в качестве корутин

### Многозадачность

![multitasking](https://miro.medium.com/max/1580/1*r94wLYporfXxgaIakEfBIA.png)

А что, если использовать генератор в качестве корутины?

У генератора уже есть необходимый нам интерфейс:

In [None]:
def pinger() -> Generator[None, None, None]:
    while True:
        print('ping')
        yield

In [None]:
def ponger() -> Generator[None, None, None]:
    while True:
        print('pong')
        yield

Давайте напишем простенький планировщик задач (scheduler):

In [None]:
from collections import deque
from time import sleep

In [None]:
def run(*tasks: Generator[None, None, None]) -> None:
    try:
        # code here...
    except KeyboardInterrupt:
        return

In [None]:
run(pinger(), ponger())

In [None]:
# ЗАПУСКАЕМ
# ░ГУСЯ░▄▀▀▀▄░РАБОТЯГИ░░
# ▄███▀░◐░░░▌░░░░░░░
# ░░░░▌░░░░░▐░░░░░░░
# ░░░░▐░░░░░▐░░░░░░░
# ░░░░▌░░░░░▐▄▄░░░░░
# ░░░░▌░░░░▄▀▒▒▀▀▀▀▄
# ░░░▐░░░░▐▒▒▒▒▒▒▒▒▀▀▄
# ░░░▐░░░░▐▄▒▒▒▒▒▒▒▒▒▒▀▄
# ░░░░▀▄░░░░▀▄▒▒▒▒▒▒▒▒▒▒▀▄
# ░░░░░░▀▄▄▄▄▄█▄▄▄▄▄▄▄▄▄▄▄▀▄
# ░░░░░░░░░░░▌▌▌▌░░░░░
# ░░░░░░░░░░░▌▌░▌▌░░░░░
# ░░░░░░░░░▄▄▌▌▄▌▌░░░░░

In [None]:
def run(*tasks: Generator[None, None, None]) -> None:
    task_queue = deque(tasks)
    try:
        while task_queue:
            sleep(0.5)
            task = task_queue.popleft()
            try:
                task.send(None)
            except StopIteration:
                continue
            task_queue.append(task)
    except KeyboardInterrupt:
        return

In [None]:
run(pinger(), ponger())

Кроме того, эту идею можно развить, и возвращать из корутины специальный объект, описывающий действие, которое должен совершить планировщик.

Например, создать новую задачу:

In [None]:
def spawner() -> Generator[Task, Any, None]:
    print('Spawn new task')
    task_id = yield NewTask(pinger())
    for _ in range(5):
        print('tick')
        yield
    yield KillTask(task_id)
    print('Task killed')

Именно это вам и нужно будет реализовать в домашней задаче `pyos`

# Спасибо за внимание!