Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Jul 4, 2024
1 parent e92c084 commit 74a4b16
Show file tree
Hide file tree
Showing 15 changed files with 587 additions and 119 deletions.
102 changes: 68 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,86 +84,131 @@ for odd_integer_string in odd_integer_strings:
# 📒 ***Operations***

## `.map`
Applies a function on elements.
Applies a transformation on elements:
```python
integer_strings: Stream[str] = integers.map(str)
negative_integer_strings: Stream[str] = integers.map(lambda n: -n).map(str)

assert list(integer_strings) == ['0', '-1', '-2', '-3', '-4', '-5', '-6', '-7', '-8', '-9']
```

It has an optional `concurrency: int` parameter to execute the function concurrently (threads-based) while preserving the order.

It has a sibling operation called `.amap` to apply an async function concurrently (see section ***`asyncio` support***).

## `.foreach`
Applies a function on elements like `.map` but yields the elements instead of the results.
Applies a side effect on elements:

```python
printed_integers: Stream[int] = integers.foreach(print)
side_effect_performed = []

printed_integers: Stream[int] = integers.foreach(side_effect_performed.append)

assert not side_effect_performed
assert list(printed_integers) == list(integers)
assert side_effect_performed
```

It has an optional `concurrency: int` parameter to execute the function concurrently (threads-based) while preserving the order.

It has a sibling operation called `.aforeach` to apply an async function concurrently (see section ***`asyncio` support***).

## `.filter`
Keeps only elements satisfying a predicate function.
Keeps only the elements that satisfy a condition:

```python
pair_integers: Stream[int] = integers.filter(lambda n: n % 2 == 0)

assert list(pair_integers) == [0, 2, 4, 6, 8]
```

## `.group`

Groups elements.
Groups elements into `Iterator`s:

```python
parity_groups: Stream[List[int]] = integers.group(size=100, seconds=4, by=lambda i: i % 2)
integers_5_by_5: Stream[List[int]] = integers.group(size=5)

assert list(integers_5_by_5) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
```
```python
integers_by_parity: Stream[List[int]] = integers.group(by=lambda n: n % 2)

A group is a list of `size` elements for which `by` returns the same value, but it may contain fewer elements in these cases:
- `seconds` have elapsed since the last yield of a group
- upstream is exhausted
- upstream raises an exception
assert list(integers_by_parity) == [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]]
```
```python
integers_within_2_seconds: Stream[List[int]] = slow_integers.group(seconds=2)

All the parameters are optional.
assert list(integers_within_2_seconds) == [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9]]
```

Combine the `size`/`by`/`seconds` parameters:
```python
integers_2_by_2_by_parity: Stream[List[int]] = integers.group(by=lambda n: n % 2, size=2)

assert list(integers_2_by_2_by_parity) == [[0, 2], [1, 3], [4, 6], [5, 7], [8], [9]]
```

## `.flatten`

Ungroups elements assuming that they are `Iterable`s.

```python
integers: Stream[int] = parity_groups.flatten()
pair_then_odd_integers: Stream[int] = integers_by_parity.flatten()

assert pair_then_odd_integers == [0, 2, 4, 6, 8, 1, 3, 5, 7, 9]
```

It has an optional `concurrency` parameter to flatten several iterables concurrently (threads).
It has an optional `concurrency: int` parameter to flatten several iterables concurrently (threads).

## `.slow`

Limits the rate at which elements are yielded up to a maximum `frequency` (elements per second).
Limits the rate at which elements are yielded up to a maximum number of elements per second:

```python
slow_integers: Stream[int] = integers.slow(frequency=2)

assert list(slow_integers) == list(integers) # takes 10 / 2 = 5 seconds
```

## `.catch`

Catches exceptions that satisfy a predicate function.
Catches a given type of exceptions:

```python
safe_inverse_floats: Stream[float] = (
integers
.map(lambda n: 1 / n)
.catch(lambda error: isinstance(error, ZeroDivisionError))
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError)
)

assert list(safe_inverse_floats) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
```

It has an optional `raise_after_exhaustion` parameter to raise the first catched exception when an iteration ends.
It has an optional `finally_raise: bool` parameter to raise the first catched exception when an iteration ends.

## `.truncate`
Stops the iteration:
- after a given number of yielded elements:
```python
five_first_integers: Stream[int] = integers.truncate(5)

assert list(five_first_integers) == [0, 1, 2, 3, 4]
```

- as soon as a predicate is satisfied:
```python
five_first_integers: Stream[int] = integers.truncate(when=lambda n: n == 5)

assert list(five_first_integers) == [0, 1, 2, 3, 4]
```

## `.observe`

Logs the progress of iterations over this stream.
Logs the progress of iterations over this stream:

If you iterate on
```python
observed_slow_integers: Stream[int] = slow_integers.observe(what="integers")
observed_slow_integers: Stream[int] = slow_integers.observe("integers")
```
you will get these logs:
```
Expand All @@ -176,17 +221,6 @@ INFO: [duration=0:00:05.039571 errors=0] 10 integers yielded

The amount of logs will never be overwhelming because they are produced logarithmically e.g. the 11th log will be produced when the iteration reaches the 1024th element.

## `.truncate`
Stops iteration as soon as the `when` predicate is satisfied or `count` elements have been yielded.

```python
five_first_integers: Stream[int] = integers.truncate(5)
```
is equivalent to:
```python
five_first_integers: Stream[int] = integers.truncate(when=lambda n: n == 5)
```

---

# 📦 ***Notes Box***
Expand Down Expand Up @@ -252,8 +286,8 @@ The `Stream`'s methods are also exposed as functions:
```python
from streamable.functions import slow

iterator: Iterator[int] = ...
slow_iterator: Iterator[int] = slow(iterator)
iterator: List[int] = ...
slow_iterator: List[int] = slow(iterator)
```

## visitor pattern
Expand Down
2 changes: 1 addition & 1 deletion README_ETL.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def reverse_etl_example():
)
.foreach(requests.Response.raise_for_status)
.observe("integrated user groups")
.catch(raise_after_exhaustion=True)
.catch(finally_raise=True)
.explain()
.exhaust()
)
Expand Down
Loading

0 comments on commit 74a4b16

Please sign in to comment.