Skip to content

Commit

Permalink
Stream: remove __repr__
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Feb 11, 2024
1 parent 52835a7 commit 9bf816d
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 96 deletions.
2 changes: 1 addition & 1 deletion streamable/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def wrap(arg):
return wrap


def raise_from(
def reraise_as(
func: Callable[[T], R], source: Type[Exception], target: Type[Exception]
) -> Callable[[T], R]:
def wrap(arg):
Expand Down
6 changes: 3 additions & 3 deletions streamable/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ def catch(
predicate: Callable[[Exception], Any] = bool,
raise_at_exhaustion: bool = False,
) -> Iterator[T]:
predicate = _util.raise_from(
predicate = _util.reraise_as(
predicate, source=StopIteration, target=WrappedStopIteration
)
return _CatchingIterator(
Expand Down Expand Up @@ -388,7 +388,7 @@ def group(
if by is None:
by = lambda _: None
else:
by = _util.raise_from(by, StopIteration, WrappedStopIteration)
by = _util.reraise_as(by, StopIteration, WrappedStopIteration)
if size is None:
size = cast(int, float("inf"))
return _GroupingIterator(iterator, size, seconds, by)
Expand All @@ -398,7 +398,7 @@ def map(
func: Callable[[T], U], iterator: Iterator[T], concurrency: int = 1
) -> Iterator[U]:
_util.validate_concurrency(concurrency)
func = _util.raise_from(func, StopIteration, WrappedStopIteration)
func = _util.reraise_as(func, StopIteration, WrappedStopIteration)
if concurrency == 1:
return builtins.map(func, iterator)
else:
Expand Down
35 changes: 2 additions & 33 deletions streamable/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from streamable._util import (
LOGGER,
get_name,
validate_concurrency,
validate_group_seconds,
validate_group_size,
Expand Down Expand Up @@ -71,9 +70,9 @@ def __add__(self, other: "Stream[T]") -> "Stream[T]":
return cast(Stream[T], Stream([self, other].__iter__).flatten())

def __iter__(self) -> Iterator[T]:
from streamable.visitors.iteration import IterationVisitor
from streamable.visitors.iterator import IteratorVisitor

return self.accept(IterationVisitor[T]())
return self.accept(IteratorVisitor[T]())

def exhaust(self) -> int:
"""
Expand All @@ -84,9 +83,6 @@ def exhaust(self) -> int:
"""
return sum(1 for _ in self)

def __repr__(self) -> str:
return f"Stream(source={get_name(self._source)})"

def accept(self, visitor: "Visitor[V]") -> V:
"""
Entry point to visit this stream (en.wikipedia.org/wiki/Visitor_pattern).
Expand Down Expand Up @@ -343,9 +339,6 @@ def __init__(
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_catch_stream(self)

def __repr__(self) -> str:
return f"CatchStream(upstream={get_name(self.upstream)}, predicate={get_name(self.predicate)}, raise_at_exhaustion={self.raise_at_exhaustion})"


class FilterStream(DownStream[T, T]):
def __init__(self, upstream: Stream[T], predicate: Callable[[T], Any]):
Expand All @@ -355,9 +348,6 @@ def __init__(self, upstream: Stream[T], predicate: Callable[[T], Any]):
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_filter_stream(self)

def __repr__(self) -> str:
return f"FilterStream(upstream={get_name(self.upstream)}, predicate={get_name(self.predicate)})"


class FlattenStream(DownStream[Iterable[T], T]):
def __init__(self, upstream: Stream[Iterable[T]], concurrency: int) -> None:
Expand All @@ -367,9 +357,6 @@ def __init__(self, upstream: Stream[Iterable[T]], concurrency: int) -> None:
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_flatten_stream(self)

def __repr__(self) -> str:
return f"FlattenStream(upstream={get_name(self.upstream)}, concurrency={self.concurrency})"


class ForeachStream(DownStream[T, T]):
def __init__(self, upstream: Stream[T], func: Callable[[T], Any], concurrency: int):
Expand All @@ -380,9 +367,6 @@ def __init__(self, upstream: Stream[T], func: Callable[[T], Any], concurrency: i
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_foreach_stream(self)

def __repr__(self) -> str:
return f"ForeachStream(upstream={get_name(self.upstream)}, func={get_name(self.func)}, concurrency={self.concurrency})"


class GroupStream(DownStream[T, List[T]]):
def __init__(
Expand All @@ -400,9 +384,6 @@ def __init__(
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_group_stream(self)

def __repr__(self) -> str:
return f"GroupStream(upstream={get_name(self.upstream)}, size={self.size}, seconds={self.seconds}, by={self.by})"


class LimitStream(DownStream[T, T]):
def __init__(self, upstream: Stream[T], count: int) -> None:
Expand All @@ -412,9 +393,6 @@ def __init__(self, upstream: Stream[T], count: int) -> None:
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_limit_stream(self)

def __repr__(self) -> str:
return f"LimitStream(upstream={get_name(self.upstream)}, count={self.count})"


class MapStream(DownStream[T, U]):
def __init__(self, upstream: Stream[T], func: Callable[[T], U], concurrency: int):
Expand All @@ -425,9 +403,6 @@ def __init__(self, upstream: Stream[T], func: Callable[[T], U], concurrency: int
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_map_stream(self)

def __repr__(self) -> str:
return f"MapStream(upstream={get_name(self.upstream)}, func={get_name(self.func)}, concurrency={self.concurrency})"


class ObserveStream(DownStream[T, T]):
def __init__(self, upstream: Stream[T], what: str, colored: bool):
Expand All @@ -438,9 +413,6 @@ def __init__(self, upstream: Stream[T], what: str, colored: bool):
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_observe_stream(self)

def __repr__(self) -> str:
return f"ObserveStream(upstream={get_name(self.upstream)}, what='{self.what}', colored={self.colored})"


class SlowStream(DownStream[T, T]):
def __init__(self, upstream: Stream[T], frequency: float):
Expand All @@ -449,6 +421,3 @@ def __init__(self, upstream: Stream[T], frequency: float):

def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_slow_stream(self)

def __repr__(self) -> str:
return f"SlowStream(upstream={get_name(self.upstream)}, frequency={self.frequency})"
30 changes: 15 additions & 15 deletions streamable/visitor.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
from abc import ABC, abstractmethod
from typing import Generic, TypeVar

from streamable import stream

V = TypeVar("V")


class Visitor(Generic[V]):
def visit_any(self, stream: stream.Stream) -> V:
raise NotImplementedError()
class Visitor(ABC, Generic[V]):
# fmt: off
@abstractmethod
def visit_stream(self, stream: stream.Stream) -> V: ...
# fmt: on

def visit_catch_stream(self, stream: stream.CatchStream) -> V:
return self.visit_any(stream)
return self.visit_stream(stream)

def visit_filter_stream(self, stream: stream.FilterStream) -> V:
return self.visit_any(stream)
return self.visit_stream(stream)

def visit_flatten_stream(self, stream: stream.FlattenStream) -> V:
return self.visit_any(stream)
return self.visit_stream(stream)

def visit_foreach_stream(self, stream: stream.ForeachStream) -> V:
return self.visit_any(stream)
return self.visit_stream(stream)

def visit_group_stream(self, stream: stream.GroupStream) -> V:
return self.visit_any(stream)
return self.visit_stream(stream)

def visit_limit_stream(self, stream: stream.LimitStream) -> V:
return self.visit_any(stream)
return self.visit_stream(stream)

def visit_observe_stream(self, stream: stream.ObserveStream) -> V:
return self.visit_any(stream)
return self.visit_stream(stream)

def visit_map_stream(self, stream: stream.MapStream) -> V:
return self.visit_any(stream)
return self.visit_stream(stream)

def visit_slow_stream(self, stream: stream.SlowStream) -> V:
return self.visit_any(stream)

def visit_stream(self, stream: stream.Stream) -> V:
return self.visit_any(stream)
return self.visit_stream(stream)
59 changes: 52 additions & 7 deletions streamable/visitors/explanation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,70 @@ def __init__(
if self.colored:
self.linking_symbol = _util.colorize_in_grey(self.linking_symbol)

def visit_any(self, stream: stream.Stream) -> str:
def _explanation(self, stream: stream.Stream, attributes_repr: str) -> str:
explanation = self.header

if self.header:
explanation += "\n"
self.header = ""

stream_repr = repr(stream)
name = stream.__class__.__name__
if self.colored:
name, rest = stream_repr.split("(", maxsplit=1)
stream_repr = _util.colorize_in_red(name) + "(" + rest
name = _util.colorize_in_red(name)

stream_repr = f"{name}({attributes_repr})"

explanation += self.linking_symbol + stream_repr + "\n"

upstream = stream.upstream
if upstream is not None:
if stream.upstream is not None:
explanation += textwrap.indent(
upstream.accept(self),
stream.upstream.accept(self),
prefix=" " * self.margin_step,
)

return explanation

def visit_stream(self, stream: stream.Stream) -> str:
return self._explanation(stream, f"source={_util.get_name(stream.source)}")

def visit_catch_stream(self, stream: stream.CatchStream) -> str:
return self._explanation(
stream,
f"predicate={_util.get_name(stream.predicate)}, raise_at_exhaustion={stream.raise_at_exhaustion}",
)

def visit_filter_stream(self, stream: stream.FilterStream) -> str:
return self._explanation(
stream, f"predicate={_util.get_name(stream.predicate)}"
)

def visit_flatten_stream(self, stream: stream.FlattenStream) -> str:
return self._explanation(stream, f"concurrency={stream.concurrency}")

def visit_foreach_stream(self, stream: stream.ForeachStream) -> str:
return self._explanation(
stream,
f"func={_util.get_name(stream.func)}, concurrency={stream.concurrency}",
)

def visit_group_stream(self, stream: stream.GroupStream) -> str:
return self._explanation(
stream, f"size={stream.size}, seconds={stream.seconds}, by={stream.by}"
)

def visit_limit_stream(self, stream: stream.LimitStream) -> str:
return self._explanation(stream, f"count={stream.count}")

def visit_map_stream(self, stream: stream.MapStream) -> str:
return self._explanation(
stream,
f"func={_util.get_name(stream.func)}, concurrency={stream.concurrency}",
)

def visit_observe_stream(self, stream: stream.ObserveStream) -> str:
return self._explanation(
stream, f"what='{stream.what}', colored={stream.colored}"
)

def visit_slow_stream(self, stream: stream.SlowStream) -> str:
return self._explanation(stream, f"frequency={stream.frequency}")
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
U = TypeVar("U")


class IterationVisitor(Visitor[Iterator[T]]):
class IteratorVisitor(Visitor[Iterator[T]]):
def visit_catch_stream(self, stream: CatchStream[T]) -> Iterator[T]:
return functions.catch(
stream.upstream.accept(self),
Expand All @@ -29,15 +29,15 @@ def visit_catch_stream(self, stream: CatchStream[T]) -> Iterator[T]:

def visit_filter_stream(self, stream: FilterStream[T]) -> Iterator[T]:
return filter(
_util.raise_from(
_util.reraise_as(
stream.predicate, StopIteration, functions.WrappedStopIteration
),
cast(Iterable[T], stream.upstream.accept(self)),
)

def visit_flatten_stream(self, stream: FlattenStream[T]) -> Iterator[T]:
return functions.flatten(
stream.upstream.accept(IterationVisitor[Iterable]()),
stream.upstream.accept(IteratorVisitor[Iterable]()),
concurrency=stream.concurrency,
)

Expand All @@ -54,7 +54,7 @@ def visit_group_stream(self, stream: GroupStream[U]) -> Iterator[T]:
return cast(
Iterator[T],
functions.group(
stream.upstream.accept(IterationVisitor[U]()),
stream.upstream.accept(IteratorVisitor[U]()),
stream.size,
stream.seconds,
stream.by,
Expand All @@ -67,20 +67,20 @@ def visit_limit_stream(self, stream: LimitStream[T]) -> Iterator[T]:
stream.count,
)

def visit_map_stream(self, stream: MapStream[U, T]) -> Iterator[T]:
return functions.map(
stream.func,
stream.upstream.accept(IteratorVisitor[U]()),
concurrency=stream.concurrency,
)

def visit_observe_stream(self, stream: ObserveStream[T]) -> Iterator[T]:
return functions.observe(
stream.upstream.accept(self),
stream.what,
stream.colored,
)

def visit_map_stream(self, stream: MapStream[U, T]) -> Iterator[T]:
return functions.map(
stream.func,
stream.upstream.accept(IterationVisitor[U]()),
concurrency=stream.concurrency,
)

def visit_slow_stream(self, stream: SlowStream[T]) -> Iterator[T]:
return functions.slow(stream.upstream.accept(self), stream.frequency)

Expand Down
Loading

0 comments on commit 9bf816d

Please sign in to comment.