# Streaming Object-Centric Process Mining with `pyBeamline`

pyBeamline is based on ReactiveX and its Python binding RxPY. RxPY is a library for composing asynchronous and event-based programs using observable sequences and pipable query operators in Python. Using pyBeamline it is possible to inject process mining operators into the computation.

This Jupyter notebook contains the main contributions towards Streaming Object-Centric Process Discovery. The rest of the notebook it is assumed that the `pyBeamline` package is already installed.

In the rest of this document, the main functionalities are exposed.

It is possible to install the library using:

In [14]:
#!pip install pybeamline # Note: Uncomment this line when meged into the main branch of pyBeamline
# Let's ignore some PM4PY warnings in the notebook
import warnings
warnings.filterwarnings("ignore")

In [15]:
import importlib
import pybeamline
importlib.reload(pybeamline) # NOTE: To-be removed when integrated in pyBeamline master

<module 'pybeamline' from 'C:\\Users\\jepmi\\Desktop\\Github\\pybeamline-OCPM\\pybeamline\\__init__.py'>

## Sources

The `pyBeamline` library provides two main object-centric sources:
1. `OCEL 2.0`: Object-Centric Event Log (OCEL) 2.0 source reads an OCEL file and emits object-centric events.
2. `Dictionary Test Source`: A test source that can be initialized with a list of dictionaries, each representing an object-centric event trace.

First lets consider the test source, which is useful for testing purposes:

In [16]:
from pybeamline.sources.dict_ocel_test_source import dict_test_ocel_source

# Consider two traces with different activities and objects for demonstration
# IMPORTANT: Object identifiers (e.g., "c1", "o1") must be unique across traces.
# Note: Specifying an object (e.g., "c1") in a trace implies that across all generated instances of `trace_1` by adding suffixes,
# that object maintains consistent relationships with the activities as defined in the trace structure, i.e, object "c1" goes from "Register Customer" to "Create Order", and so on.
trace_1 = [
            {"activity": "Register Customer", "objects": {"Customer": ["c1"]}},
            {"activity": "Create Order", "objects": {"Customer": ["c1"], "Order": ["o1"]}},
            {"activity": "Add Item", "objects": {"Order": ["o1"], "Item": ["i1"]}},
            {"activity": "Add Item", "objects": {"Order": ["o1"], "Item": ["i2"]}},
            {"activity": "Ship Order", "objects": {"Item": ["i1", "i2"], "Order": ["o1"]}}
        ]

trace_2 = [
            {"activity": "Register Guest", "objects": {"Guest": ["g1"]}},
            {"activity": "Create Booking", "objects": {"Guest": ["g1"], "Booking": ["b1"]}},
            {"activity": "Reserve Room", "objects": {"Booking": ["b1"]}},
            {"activity": "Check In", "objects": {"Guest": ["g1"], "Booking": ["b1"]}},
            {"activity": "Check Out", "objects": {"Guest": ["g1"], "Booking": ["b1"]}}
        ]

# One can create the test source with a list of traces, where each trace is a tuple containing the trace structure and the number of times it should be repeated.
# If
test_source = dict_test_ocel_source(
    [
        (trace_1, 2),  # Repeat trace_1 twice
        (trace_2, 3)   # Repeat trace_2 three times
    ],
    shuffle = True  # Shuffle the traces for variability
                    # Note: The shuffle works by shuffling the order of traces as a whole, not the activities within each trace.
)

test_source.subscribe(
    lambda x: print(str(x))
)

{'ocel:eid': 'e0', 'ocel:activity': 'Register Customer', 'ocel:timestamp': datetime.datetime(2025, 7, 16, 1, 44, 15, 257679), 'ocel:omap': {'Customer': {'c1_1'}}, 'ocel:vmap': {}}
{'ocel:eid': 'e1', 'ocel:activity': 'Create Order', 'ocel:timestamp': datetime.datetime(2025, 7, 16, 1, 44, 15, 257679), 'ocel:omap': {'Customer': {'c1_1'}, 'Order': {'o1_1'}}, 'ocel:vmap': {}}
{'ocel:eid': 'e2', 'ocel:activity': 'Add Item', 'ocel:timestamp': datetime.datetime(2025, 7, 16, 1, 44, 15, 257679), 'ocel:omap': {'Order': {'o1_1'}, 'Item': {'i1_1'}}, 'ocel:vmap': {}}
{'ocel:eid': 'e3', 'ocel:activity': 'Add Item', 'ocel:timestamp': datetime.datetime(2025, 7, 16, 1, 44, 15, 257679), 'ocel:omap': {'Order': {'o1_1'}, 'Item': {'i2_1'}}, 'ocel:vmap': {}}
{'ocel:eid': 'e4', 'ocel:activity': 'Ship Order', 'ocel:timestamp': datetime.datetime(2025, 7, 16, 1, 44, 15, 257679), 'ocel:omap': {'Item': {'i2_1', 'i1_1'}, 'Order': {'o1_1'}}, 'ocel:vmap': {}}
{'ocel:eid': 'e5', 'ocel:activity': 'Register Guest', 'oce

<reactivex.disposable.disposable.Disposable at 0x1f36d55b4a0>

In [17]:
#!wget https://raw.githubusercontent.com/beamline/pybeamline/refs/heads/master/tests/logistics.jsonocel

In [20]:
# Now let's consider the OCEL 2.0 source, which reads an OCEL file and emits object-centric events.
from pybeamline.sources.ocel2_log_source_from_file import ocel2_log_source_from_file
from reactivex import operators as ops
import os

# Check if the file exists
if not os.path.exists("tests/logistics.jsonocel"):
    raise FileNotFoundError("The OCEL file 'logistics.jsonocel' does not exist. Please download it from the repository.")
else:
    print("The OCEL file 'logistics.jsonocel' exists.")

# Assuming you have an OCEL file
source = ocel2_log_source_from_file("tests/logistics.jsonocel")




#source.pipe(
#    ops.take(10),  # Limit to the first 10 events for demonstration
#    ops.do_action(print)
#).subscribe(
#    lambda x: print(str(x))
#)

The OCEL file 'logistics.jsonocel' exists.


AttributeError: 'NoneType' object has no attribute 'event_timestamp'