Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 42 additions & 21 deletions docs/pybeamline/integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,65 @@ In this case, we built two logs (`log_original` and `log_after_drift`) which inc
After that we can use the capabilities of pyBeamline and reactivex to construct a pipeline that produce a sequence of frequencies corresponding to the frequency of directly follows relation `BC` in window with length 40 (which is chosen as all our traces have length 4). Also note that we leverage the fact that in all our events when `B` and `C` appear they are always in the same trace (because of how `log_source` generates the observable). We will later define a function `check_for_drift`:

```python
import reactivex
from reactivex import operators as ops

log_with_drift.pipe(
ops.buffer_with_count(40),
ops.flat_map(lambda events: reactivex.from_iterable(events).pipe(
RxOperator(ops.buffer_with_count(40)),
RxOperator(ops.flat_map(lambda events: reactivex.from_iterable(events).pipe(
ops.pairwise(),
ops.filter(lambda x: x[0].get_trace_name() == x[1].get_trace_name() and x[0].get_event_name() == "B" and x[1].get_event_name() == "C"),
ops.count()
)
)
).subscribe(lambda x: print(x))
)),
).sink(print_sink())
```

After this we can define our function for drift detection and collection of points and drift indexes using:
```python
from reactivex import operators as ops
import reactivex

from pybeamline.stream.rx_operator import RxOperator
from pybeamline.stream.base_sink import BaseSink
from typing import Optional, List
from pybeamline.stream.base_map import BaseMap
from river import drift
from reactivex import operators as ops

drift_detector = drift.ADWIN()
data = []
drifts = []
class CheckForDrift(BaseMap[int, int]):
def __init__(self):
self.drift_detector = drift.ADWIN()
self.drifts = []
self.index = 0

def check_for_drift():
index = 0
def transform(self, value: int) -> Optional[List[int]]:
self.drift_detector.update(value)
self.index += 1
if self.drift_detector.drift_detected:
self.drifts.append(self.index)
return [value]

def _process(x):
nonlocal index
drift_detector.update(x)
index = index + 1
if drift_detector.drift_detected:
drifts.append(index)
class CollectSink(BaseSink[int]):
def __init__(self):
self.data = []

def _check_for_drift(obs):
return obs.pipe(ops.do_action(lambda value: _process(value)))
def consume(self, item: int) -> None:
self.data.append(item)

return _check_for_drift

drift_detector = CheckForDrift()
collector = CollectSink()

log_with_drift.pipe(RxOperator(ops.buffer_with_count(40)),
RxOperator(ops.flat_map(lambda events: reactivex.from_iterable(events).pipe(
ops.pairwise(),
ops.filter(lambda x: x[0].get_trace_name() == x[1].get_trace_name() and x[0].get_event_name() == "B" and x[1].get_event_name() == "C"),
ops.count()
)
)),
drift_detector
).sink(collector)
```
With this function available, `check_for_drift` can now be piped to the previous computation. Plotting the frequencies and the concept drifts will result in the following:
With this class available, `CheckForDrift` can now be piped to the previous computation. Plotting the frequencies and the concept drifts will result in the following:

![](https://github.com/beamline/docs/blob/main/site/img/drifts.png?raw=true)

Expand Down