### Generators (Advanced)

As we saw in the lecture, **generator expressions** are a way to create generator objects using comprehension-like syntax (there are other ways too, using `yield` in functions, but we'll focus on expressions here).

Generators are **iterators**: they implement `__iter__` (which returns the object itself) and `__next__` (to fetch the next value). They raise `StopIteration` when exhausted and are **one-shot** (cannot be reused after exhaustion).

In [1]:
squares = (i ** 2 for i in range(5))

Just like iterators, they are one-time use only:

In [2]:
for i in squares:
    print(i)

0
1
4
9
16


`squares` is now an exhausted iterator:

In [3]:
for i in squares:
    print('iterating again...')  # nothing prints

Contrast with a list comprehension (re-iterable):

In [4]:
l = [i ** 2 for i in range(5)]
for i in l:
    print(i)
for i in l:
    print(i)  # prints again, lists are reusable iterables

0
1
4
9
16
0
1
4
9
16


If we want to re-iterate over squares, we must re-create the generator:

In [5]:
squares = (i ** 2 for i in range(5))

`list(<iterable>)` builds a list by iterating over the iterable. If you pass a generator (or any iterator) to `list()` or `tuple()`, you **will exhaust** it:

In [6]:
squares = (i ** 2 for i in range(5))
print(list(squares))
print(list(squares))  # now empty

[0, 1, 4, 9, 16]
[]


Generators behave like iterators with respect to `iter()` and `next()`:

In [7]:
squares = (i ** 2 for i in range(5))
assert iter(squares) is squares

try:
    while True:
        print(next(squares))
except StopIteration:
    print('finished iterating')

try:
    next(squares)
    raise AssertionError('Expected StopIteration after exhaustion')
except StopIteration:
    print('no more!')

0
1
4
9
16
finished iterating
no more!


**Beware**: Membership testing (e.g., `x in generator`) consumes items until the answer is known, potentially leaving a **partially consumed** iterator, which can cause subtle bugs.

In [8]:
squares = (i ** 2 for i in range(5))
assert (4 in squares) is True  # consumes 0,1,4 and stops
assert list(squares) == [9, 16]  # only remaining values
print('OK - membership consumption demo')

OK - membership consumption demo


---
## Advanced Exercises (with tests)
All tasks below should be solved **using generators** (generator expressions or functions that `yield`) where appropriate. Each includes assertions that must pass.

### Task 1 — Pipeline with generator expressions
Create a pipeline that, given `n`, produces the sum of squares of numbers in `0..n-1` that are multiples of 3 **or** 5. Use a generator expression pipeline only (no intermediate lists).

In [9]:
def sum_squares_multiples(n):
    # numbers, filtered, mapped, reduced via sum
    return sum((i*i for i in range(n) if (i % 3 == 0) or (i % 5 == 0)))

# Tests
assert sum_squares_multiples(0) == 0
assert sum_squares_multiples(1) == 0
assert sum_squares_multiples(10) == (0**2 + 3**2 + 5**2 + 6**2 + 9**2)  # 0+9+25+36+81=151
assert sum_squares_multiples(10) == 151
print('OK - 1')

OK - 1


### Task 2 — `take(n, iterable)`
Implement a generator-friendly `take(n, iterable)` that returns a **list** of the first `n` items from any iterable without over-consuming it (i.e., it stops exactly at `n`). Do not convert the entire iterable to a list.

In [10]:
def take(n, iterable):
    out = []
    it = iter(iterable)
    for _ in range(max(0, n)):
        try:
            out.append(next(it))
        except StopIteration:
            break
    return out

# Tests
g = (i for i in range(10))
assert take(3, g) == [0, 1, 2]
assert take(0, g) == []
assert take(2, g) == [3, 4]
assert take(10, (i for i in range(5))) == [0,1,2,3,4]
print('OK - 2')

OK - 2


### Task 3 — `chunked(iterable, size)`
Write a **generator** that yields lists (chunks) of length up to `size` from `iterable`. The last chunk may be shorter. Do not materialize the entire iterable in memory. (`size >= 1`)

In [11]:
def chunked(iterable, size):
    if size < 1:
        raise ValueError('size must be >= 1')
    it = iter(iterable)
    while True:
        chunk = []
        try:
            for _ in range(size):
                chunk.append(next(it))
        except StopIteration:
            if chunk:
                yield chunk
            break
        yield chunk

# Tests
assert list(chunked(range(7), 3)) == [[0,1,2], [3,4,5], [6]]
assert list(chunked([], 3)) == []
assert list(chunked('abcdef', 2)) == [['a','b'], ['c','d'], ['e','f']]
print('OK - 3')

OK - 3


### Task 4 — `sliding_window(iterable, size)`
Implement a **generator** that yields consecutive overlapping windows (as tuples) of length `size`. For example, `sliding_window('abcd', 3)` yields `('a','b','c')`, then `('b','c','d')`. Windows are yielded only when they are **full**. Use O(size) memory. (`size >= 1`)

In [12]:
from collections import deque

def sliding_window(iterable, size):
    if size < 1:
        raise ValueError('size must be >= 1')
    it = iter(iterable)
    dq = deque(maxlen=size)
    # fill first window
    for _ in range(size):
        try:
            dq.append(next(it))
        except StopIteration:
            return
    yield tuple(dq)
    for x in it:
        dq.append(x)
        yield tuple(dq)

# Tests
assert list(sliding_window('abcd', 3)) == [('a','b','c'), ('b','c','d')]
assert list(sliding_window([1,2,3,4], 2)) == [(1,2),(2,3),(3,4)]
assert list(sliding_window([1,2], 3)) == []
print('OK - 4')

OK - 4


### Task 5 — `distinct_justseen(iterable, key=None)`
Yield **only when the key of the current item differs from the previous** (like `itertools.groupby`'s boundaries). This differs from global uniqueness; consecutive equal keys collapse to one. Yield the **original item** each time you emit.

Examples:
* `distinct_justseen('AAABCCDDDAA') -> A, B, C, D, A`
* With `key=str.lower`, `['a','A','a'] -> 'a'` (only first is emitted because the next two are consecutive with same lowercase key).

In [13]:
def distinct_justseen(iterable, key=None):
    it = iter(iterable)
    try:
        first = next(it)
    except StopIteration:
        return
    kf = key(first) if key else first
    yield first
    for x in it:
        kx = key(x) if key else x
        if kx != kf:
            yield x
            kf = kx

# Tests
assert ''.join(distinct_justseen('AAABCCDDDAA')) == 'ABCDA'
assert list(distinct_justseen([1,1,2,2,2,3,1])) == [1,2,3,1]
assert list(distinct_justseen(['a','A','a'], key=str.lower)) == ['a']
print('OK - 5')

OK - 5


### Task 6 — `ncycles(iterable, n)` (finite cycle)
Repeat the **entire** iterable `n` times (like a finite version of `itertools.cycle`).

Requirements:
- Your function must be a **generator**.
- It should work for both **re-iterable** inputs (e.g., lists, ranges) and **single-use iterators** (e.g., generators). For single-use iterators, you'll need to **cache** the elements once (but only once).
- Do not rely on `itertools.cycle` directly.

In [14]:
def ncycles(iterable, n):
    if n < 0:
        raise ValueError('n must be >= 0')
    if n == 0:
        return
    # Detect if 'iterable' is a single-use iterator (iter(obj) is obj)
    it_probe = iter(iterable)
    is_single_use_iterator = (it_probe is iterable)
    if is_single_use_iterator:
        # Cache once, then replay n times
        cache = list(it_probe)  # consume the iterator
        for _ in range(n):
            for x in cache:
                yield x
    else:
        # Re-iterable: iterate n times directly without caching
        for _ in range(n):
            for x in iterable:
                yield x

# Tests (re-iterable)
assert list(ncycles([1,2], 3)) == [1,2,1,2,1,2]
assert list(ncycles(range(3), 2)) == [0,1,2,0,1,2]

# Tests (single-use iterator)
g = (i for i in [10,20])
assert list(ncycles(g, 3)) == [10,20,10,20,10,20]
print('OK - 6')

OK - 6


### Task 7 — `flatten_once(nested)`
Flatten a **single level** from an iterable of iterables (e.g., `[[1,2],[3],[4,5]] -> 1,2,3,4,5`). Do **not** recursively flatten deeper nesting. Treat top-level strings as **atomic** (don't iterate into them), but if a **nested iterable** contains strings, emit their characters (to match the test). Must be a generator.

In [15]:
def _is_iterable(obj):
    try:
        iter(obj)
        return True
    except TypeError:
        return False

def flatten_once(nested):
    for item in nested:
        # Top-level strings are atomic
        if isinstance(item, (str, bytes, bytearray)) or not _is_iterable(item):
            yield item
        else:
            # Flatten one level: if the nested element is a string, emit its characters
            for inner in item:
                if isinstance(inner, (str, bytes, bytearray)):
                    for ch in inner:
                        yield ch
                else:
                    yield inner

# Tests
assert list(flatten_once([[1,2],[3],[4,5]])) == [1,2,3,4,5]
assert list(flatten_once([(1,2), [3,4]])) == [1,2,3,4]
assert list(flatten_once(['ab', ['cd']])) == ['ab', 'c', 'd']  # match expected behavior
print('OK - 7')

OK - 7
