From 35fe871d2142f572fd648704c50a1135aa429cb2 Mon Sep 17 00:00:00 2001 From: Andrea Burattin Date: Mon, 19 Jan 2026 18:06:00 +0100 Subject: [PATCH] Fixed integration page --- docs/pybeamline/integration.md | 63 ++++++++++++++++++++++------------ 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/docs/pybeamline/integration.md b/docs/pybeamline/integration.md index 181fd04..7cf2973 100644 --- a/docs/pybeamline/integration.md +++ b/docs/pybeamline/integration.md @@ -22,44 +22,65 @@ In this case, we built two logs (`log_original` and `log_after_drift`) which inc After that we can use the capabilities of pyBeamline and reactivex to construct a pipeline that produce a sequence of frequencies corresponding to the frequency of directly follows relation `BC` in window with length 40 (which is chosen as all our traces have length 4). Also note that we leverage the fact that in all our events when `B` and `C` appear they are always in the same trace (because of how `log_source` generates the observable). We will later define a function `check_for_drift`: ```python -import reactivex from reactivex import operators as ops log_with_drift.pipe( - ops.buffer_with_count(40), - ops.flat_map(lambda events: reactivex.from_iterable(events).pipe( + RxOperator(ops.buffer_with_count(40)), + RxOperator(ops.flat_map(lambda events: reactivex.from_iterable(events).pipe( ops.pairwise(), ops.filter(lambda x: x[0].get_trace_name() == x[1].get_trace_name() and x[0].get_event_name() == "B" and x[1].get_event_name() == "C"), ops.count() ) - ) -).subscribe(lambda x: print(x)) + )), +).sink(print_sink()) ``` + After this we can define our function for drift detection and collection of points and drift indexes using: ```python -from reactivex import operators as ops +import reactivex + +from pybeamline.stream.rx_operator import RxOperator +from pybeamline.stream.base_sink import BaseSink +from typing import Optional, List +from pybeamline.stream.base_map import BaseMap from river import drift +from reactivex import operators as ops -drift_detector = drift.ADWIN() -data = [] -drifts = [] +class CheckForDrift(BaseMap[int, int]): + def __init__(self): + self.drift_detector = drift.ADWIN() + self.drifts = [] + self.index = 0 -def check_for_drift(): - index = 0 + def transform(self, value: int) -> Optional[List[int]]: + self.drift_detector.update(value) + self.index += 1 + if self.drift_detector.drift_detected: + self.drifts.append(self.index) + return [value] - def _process(x): - nonlocal index - drift_detector.update(x) - index = index + 1 - if drift_detector.drift_detected: - drifts.append(index) +class CollectSink(BaseSink[int]): + def __init__(self): + self.data = [] - def _check_for_drift(obs): - return obs.pipe(ops.do_action(lambda value: _process(value))) + def consume(self, item: int) -> None: + self.data.append(item) - return _check_for_drift + +drift_detector = CheckForDrift() +collector = CollectSink() + +log_with_drift.pipe(RxOperator(ops.buffer_with_count(40)), + RxOperator(ops.flat_map(lambda events: reactivex.from_iterable(events).pipe( + ops.pairwise(), + ops.filter(lambda x: x[0].get_trace_name() == x[1].get_trace_name() and x[0].get_event_name() == "B" and x[1].get_event_name() == "C"), + ops.count() + ) + )), + drift_detector +).sink(collector) ``` -With this function available, `check_for_drift` can now be piped to the previous computation. Plotting the frequencies and the concept drifts will result in the following: +With this class available, `CheckForDrift` can now be piped to the previous computation. Plotting the frequencies and the concept drifts will result in the following: ![](https://github.com/beamline/docs/blob/main/site/img/drifts.png?raw=true)