# ⚠ Warning

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gl/OpinionatedGeek%2Fmango-explorer/HEAD?filepath=Observables.ipynb) _🏃‍♀️ To run this notebook press the ⏩ icon in the toolbar above._

[🥭 Mango Markets](https://mango.markets/) support is available at: [Docs](https://docs.mango.markets/) | [Discord](https://discord.gg/67jySBhxrg) | [Twitter](https://twitter.com/mangomarkets) | [Github](https://github.com/blockworks-foundation) | [Email](mailto:hello@blockworks.foundation)

# 🥭 Observables

This notebook contains some useful shared tools to work with [RX Observables](https://rxpy.readthedocs.io/en/latest/reference_observable.html).


In [None]:
import logging
import rx
import rx.operators as ops
import typing


# PrintingObserverSubscriber class

This class can subscribe to an `Observable` and print out each item.

In [None]:
class PrintingObserverSubscriber(rx.core.typing.Observer):
    def __init__(self, report_no_output: bool) -> None:
        self.report_no_output = report_no_output

    def on_next(self, item: typing.Any) -> None:
        self.report_no_output = False
        print(item)

    def on_error(self, ex: Exception) -> None:
        self.report_no_output = False
        print(ex)

    def on_completed(self) -> None:
        if self.report_no_output:
            print("No items to show.")


# CollectingObserverSubscriber class

This class can subscribe to an `Observable` and collect each item.

In [None]:
class CollectingObserverSubscriber(rx.core.typing.Observer):
    def __init__(self) -> None:
        self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
        self.collected: typing.List[typing.Any] = []

    def on_next(self, item: typing.Any) -> None:
        self.collected += [item]

    def on_error(self, ex: Exception) -> None:
        self.logger.error(f"Received error: {ex}")

    def on_completed(self) -> None:
        pass


# CaptureFirstItem class

This captures the first item to pass through the pipeline, allowing it to be instpected later.

In [None]:
class CaptureFirstItem:
    def __init__(self):
        self.captured: typing.Any = None
        self.has_captured: bool = False

    def capture_if_first(self, item: typing.Any) -> typing.Any:
        if not self.has_captured:
            self.captured = item
            self.has_captured = True

        return item


# debug_print_item function

This is a handy item that can be added to a pipeline to show what is being passed at that particular stage. For example, this shows how to print the item before and after filtering:
```
fetch().pipe(
    ops.map(debug_print_item("Unfiltered:")),
    ops.filter(lambda item: item.something is not None),
    ops.map(debug_print_item("Filtered:")),
    ops.filter(lambda item: item.something_else()),
    ops.map(act_on_item)
).subscribe(some_subscriber)
```

In [None]:
def debug_print_item(title: str) -> typing.Callable[[typing.Any], typing.Any]:
    def _debug_print_item(item: typing.Any) -> typing.Any:
        print(title, item)
        return item
    return _debug_print_item


# Events

A strongly(ish)-typed event source that can handle many subscribers.

In [None]:
TEventDatum = typing.TypeVar('TEventDatum')


class EventSource(typing.Generic[TEventDatum], rx.subject.Subject):
    def __init__(self) -> None:
        self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)

    def on_next(self, value: typing.Type[TEventDatum]) -> None:
        super().on_next(value)

    def on_error(self, ex: Exception) -> None:
        super().on_error(ex)

    def on_completed(self) -> None:
        super().on_completed()

    def publish(self, value: typing.Type[TEventDatum]) -> None:
        self.on_next(value)

    def dispose(self) -> None:
        super().dispose()


# 🏃 Running

A few quick examples to show how to use these functions

In [None]:
if __name__ == "__main__":
    rx.from_([1, 2, 3, 4, 5]).subscribe(PrintingObserverSubscriber(False))
    rx.from_([1, 2, 3, 4, 5]).pipe(
        ops.filter(lambda item: (item % 2) == 0),
    ).subscribe(PrintingObserverSubscriber(False))

    collector = CollectingObserverSubscriber()
    rx.from_(["a", "b", "c"]).subscribe(collector)
    print(collector.collected)

    rx.from_([1, 2, 3, 4, 5]).pipe(
        ops.map(debug_print_item("Before even check:")),
        ops.filter(lambda item: (item % 2) == 0),
        ops.map(debug_print_item("After even check:")),
    ).subscribe(PrintingObserverSubscriber(True))
