In [1]:
# Ensure project root (OpenCEP) is on sys.path for imports like `from base.Pattern import Pattern`
import sys, os, pathlib

nb_dir = (
    pathlib.Path(__file__).parent if "__file__" in globals() else pathlib.Path.cwd()
)
project_root = str((nb_dir / "..").resolve())
if project_root not in sys.path:
    sys.path.insert(0, project_root)

In [2]:
from datetime import timedelta
from CEP import CEP
from base.Pattern import Pattern
from base.PatternStructure import (
    SeqOperator,
    PrimitiveEventStructure,
    KleeneClosureOperator,
)
from condition.CompositeCondition import AndCondition
from condition.Condition import Variable, SimpleCondition
from condition.KCCondition import KCIndexCondition
from stream.FileStream import FileOutputStream
from stream.DataFrameStream import CitiBikeDataFrameInputStream
from plugin.citibike.CitiBike2 import (
    CitiBikeTripEventTypeClassifier,
    CitiBikeDataFormatter,
)
import test
from tree.PatternMatchStorage import TreeStorageParameters
from parallel.ParallelExecutionParameters import (
    DataParallelExecutionParametersHirzelAlgorithm,
)
from parallel.ParallelExecutionPlatforms import ParallelExecutionPlatforms
from plugin.citibike.RessourceConsumption import RessourceConsumption

monitor = RessourceConsumption()


In [3]:
citibikeHotPathsPattern = Pattern(
    SeqOperator(
        KleeneClosureOperator(PrimitiveEventStructure("CitiBikeTrip", "a"), max_size=5),
        PrimitiveEventStructure("CitiBikeTrip", "b"),
    ),
    AndCondition(
        KCIndexCondition(
            names={"a"},
            getattr_func=lambda x: x["bikeid"],
            relation_op=lambda a1, a2: a1 == a2,
            offset=-1,
        ),
        KCIndexCondition(
            names={"a"},
            getattr_func=lambda x: (
                int(float(x["startstationid"])),
                int(float(x["endstationid"])),
            ),
            relation_op=lambda a1, a2: a1[0] == a2[1],
            offset=-1,
        ),
        SimpleCondition(
            Variable("a", lambda x: x[-1]["bikeid"]),
            Variable("b", lambda x: x["bikeid"]),
            relation_op=lambda a, b: a == b,
        ),
        SimpleCondition(
            Variable("a", lambda x: int(float(x[-1]["endstationid"]))),
            Variable("b", lambda x: int(float(x["startstationid"]))),
            relation_op=lambda a, b: a == b,
        ),
        SimpleCondition(
            Variable("b", lambda x: int(float(x["endstationid"]))),
            relation_op=lambda end_id: str(end_id) in {"111111"},
        ),
    ),
    timedelta(minutes=31),
)

In [4]:
input_file = "../test/EventFiles/201901-citibike-tripdata-1-fabricated-small.csv"
output_file = "output_citibike_baseline_small_try.txt"

events = CitiBikeDataFrameInputStream(
    input_file,
    timestamp_column="starttime",
)
cep = CEP([citibikeHotPathsPattern])
monitor.run(
    cep.run,
    events,
    FileOutputStream("../test/demo/Matches/testing", output_file),
    CitiBikeDataFormatter(),
)

Creating evaluation manager...
 - Parallel execution: None
 - Storage: None
 - Using ParallelExecutionModes.SEQUENTIAL execution mode
!!! Using default evaluation mechanism parameters...
!!! Creating tree-based evaluation mechanism...
Tree calling create_storage_unit with storage_params: TreeStorageParameters(sort_storage=False, attributes_priorities={}, clean_up_interval=10, prioritize_sorting_by_timestamp=True, enable_load_shedding=False, load_shedding_threshold=1000, load_shedding_drop_rate=0.1, load_shedding_strategy=random,latency_bound=None)
InternalNode creating storage: sort=False, sorting_key=None
UnsortedPatternMatchStorage created with storage_params: TreeStorageParameters(sort_storage=False, attributes_priorities={}, clean_up_interval=10, prioritize_sorting_by_timestamp=True, enable_load_shedding=False, load_shedding_threshold=1000, load_shedding_drop_rate=0.1, load_shedding_strategy=random,latency_bound=None)
InternalNode creating storage: sort=False, sorting_key=None
Unso

44.117049

# Baseline - small dataset

In [4]:
input_file = "../test/EventFiles/201901-citibike-tripdata-1-fabricated-small.csv"
output_file = "output_citibike_baseline_small.txt"

events = CitiBikeDataFrameInputStream(
    input_file,
    timestamp_column="starttime",
)
cep = CEP([citibikeHotPathsPattern])
monitor.run(
    cep.run,
    events,
    FileOutputStream("../test/demo/Matches/testing", output_file),
    CitiBikeDataFormatter(),
)

Creating evaluation manager...
 - Parallel execution: None
 - Storage: None
 - Using ParallelExecutionModes.SEQUENTIAL execution mode
!!! Using default evaluation mechanism parameters...
!!! Creating tree-based evaluation mechanism...
Tree calling create_storage_unit with storage_params: TreeStorageParameters(sort_storage=False, attributes_priorities={}, clean_up_interval=10, prioritize_sorting_by_timestamp=True, enable_load_shedding=False, load_shedding_threshold=1000, load_shedding_drop_rate=0.1, load_shedding_strategy=random,latency_bound=None)
InternalNode creating storage: sort=False, sorting_key=None
UnsortedPatternMatchStorage created with storage_params: TreeStorageParameters(sort_storage=False, attributes_priorities={}, clean_up_interval=10, prioritize_sorting_by_timestamp=True, enable_load_shedding=False, load_shedding_threshold=1000, load_shedding_drop_rate=0.1, load_shedding_strategy=random,latency_bound=None)
InternalNode creating storage: sort=False, sorting_key=None
Unso

44.733967

# Both small dataste

In [7]:
input_file = "../test/EventFiles/201901-citibike-tripdata-1-fabricated-small.csv"
output_file = "output_citibike_both_small.txt"

load_shedding_params = TreeStorageParameters(
    sort_storage=True,
    enable_load_shedding=True,
    load_shedding_threshold=50,
    load_shedding_drop_rate=0.3,
    load_shedding_strategy="oldest",
    clean_up_interval=10,
    latency_bound=0.1
)
dp_params = DataParallelExecutionParametersHirzelAlgorithm(
    platform=ParallelExecutionPlatforms.THREADING,
    units_number=12,  # how many threads
    key="bikeid",
)
cep = CEP([citibikeHotPathsPattern], parallel_execution_params=dp_params, storage_params=load_shedding_params )

events = CitiBikeDataFrameInputStream(
    input_file,
    timestamp_column="starttime",
)

cep = CEP([citibikeHotPathsPattern])
monitor.run(
    cep.run,
    events,
    FileOutputStream("../test/demo/Matches/testing", output_file),
    CitiBikeDataFormatter(),
)

Creating evaluation manager...
 - Parallel execution: <parallel.ParallelExecutionParameters.DataParallelExecutionParametersHirzelAlgorithm object at 0x11c4a4dd0>
 - Storage: TreeStorageParameters(sort_storage=True, attributes_priorities={}, clean_up_interval=10, prioritize_sorting_by_timestamp=True, enable_load_shedding=True, load_shedding_threshold=50, load_shedding_drop_rate=0.3, load_shedding_strategy=oldest,latency_bound=0.1)
 - Using ParallelExecutionModes.DATA_PARALLELISM execution mode
!!! Using default evaluation mechanism parameters...
!!! Creating tree-based evaluation mechanism...
Tree calling create_storage_unit with storage_params: TreeStorageParameters(sort_storage=True, attributes_priorities={}, clean_up_interval=10, prioritize_sorting_by_timestamp=True, enable_load_shedding=True, load_shedding_threshold=50, load_shedding_drop_rate=0.3, load_shedding_strategy=oldest,latency_bound=0.1)
InternalNode creating storage: sort=True, sorting_key=None
UnsortedPatternMatchStorage 

44.883261

In [None]:
# Parallelization only - big dataset

In [None]:
dp_params = DataParallelExecutionParametersHirzelAlgorithm(
    platform=ParallelExecutionPlatforms.THREADING,
    units_number=12,  # how many threads
    key="bikeid",
)
cep = CEP([citibikeHotPathsPattern], parallel_execution_params=dp_params)

In [None]:
input_file = "../test/EventFiles/201901-citibike-tripdata-1-fabricated.csv"
output_file = "output_citibike_only_parallel.txt"

events = CitiBikeDataFrameInputStream(
    input_file,
    timestamp_column="starttime",
)


In [None]:
# cep.run(
#     events,
#     FileOutputStream("../test/demo/Matches/testing", output_file),
#     CitiBikeDataFormatter(),
# )
#ran for 513 min

# Parallelization only - small dataset

In [None]:
input_file = "../test/EventFiles/201901-citibike-tripdata-1-fabricated-small.csv"
output_file = "output_citibike_only_parallel-small.txt"

events = CitiBikeDataFrameInputStream(
    input_file,
    timestamp_column="starttime",
)

In [None]:
cep.run(
    events,
    FileOutputStream("../test/demo/Matches/testing", output_file),
    CitiBikeDataFormatter(),
)

# Parallelization & Load shedding

In [None]:
load_shedding_params = TreeStorageParameters(
    sort_storage=True,
    enable_load_shedding=True,
    load_shedding_threshold=15,
    load_shedding_drop_rate=0.3,
    load_shedding_strategy="oldest",
    clean_up_interval=10,
)
dp_params = DataParallelExecutionParametersHirzelAlgorithm(
    platform=ParallelExecutionPlatforms.THREADING,
    units_number=8,  # how many threads
    key="bikeid",
)
cep = CEP([citibikeHotPathsPattern], parallel_execution_params=dp_params, storage_params=load_shedding_params )


In [None]:
input_file = "../test/EventFiles/201901-citibike-tripdata-1-fabricated.csv"
output_file = "output_citibike_both.txt"

events = CitiBikeDataFrameInputStream(
    input_file,
    timestamp_column="starttime",
)

In [None]:
cep.run(
    events,
    FileOutputStream("../test/demo/Matches/testing", output_file),
    CitiBikeDataFormatter(),
)# ran for an hour

# Load shedding 2

In [None]:
load_shedding_params = TreeStorageParameters(
    sort_storage=True,
    enable_load_shedding=True,
    load_shedding_threshold=15,
    load_shedding_drop_rate=0.3,
    load_shedding_strategy="oldest",
    clean_up_interval=10,
)
dp_params = DataParallelExecutionParametersHirzelAlgorithm(
    platform=ParallelExecutionPlatforms.THREADING,
    units_number=8,  # how many threads
    key="bikeid",
)
cep = CEP([citibikeHotPathsPattern], parallel_execution_params=dp_params, storage_params=load_shedding_params )


In [None]:
input_file = "../test/EventFiles/201901-citibike-tripdata-1-fabricated.csv"
output_file = "output_citibike_both-2.txt"

events = CitiBikeDataFrameInputStream(
    input_file,
    timestamp_column="starttime",
)

In [None]:
cep.run(
    events,
    FileOutputStream("../test/demo/Matches/testing", output_file),
    CitiBikeDataFormatter(),
)

# Both after merge

In [None]:
load_shedding_params = TreeStorageParameters(
    sort_storage=True,
    enable_load_shedding=True,
    load_shedding_threshold=15,
    load_shedding_drop_rate=0.3,
    load_shedding_strategy="oldest",
    clean_up_interval=10,
    latency_bound=0.01
)
dp_params = DataParallelExecutionParametersHirzelAlgorithm(
    platform=ParallelExecutionPlatforms.THREADING,
    units_number=8,  # how many threads
    key="bikeid",
)
cep = CEP([citibikeHotPathsPattern], parallel_execution_params=dp_params, storage_params=load_shedding_params )


In [None]:
input_file = "../test/EventFiles/201901-citibike-tripdata-1-fabricated-small.csv"
output_file = "output_citibike_both-merge.txt"

events = CitiBikeDataFrameInputStream(
    input_file,
    timestamp_column="starttime",
)

In [None]:
monitor.run(
    cep.run,
    events,
    FileOutputStream("../test/demo/Matches", output_file),
    CitiBikeDataFormatter(),
)

# Both big dataset

In [4]:
load_shedding_params = TreeStorageParameters(
    sort_storage=True,
    enable_load_shedding=True,
    load_shedding_threshold=50,
    load_shedding_drop_rate=0.3,
    load_shedding_strategy="oldest",
    clean_up_interval=10,
    latency_bound=0.1
)
dp_params = DataParallelExecutionParametersHirzelAlgorithm(
    platform=ParallelExecutionPlatforms.THREADING,
    units_number=12,  # how many threads
    key="bikeid",
)
cep = CEP([citibikeHotPathsPattern], parallel_execution_params=dp_params, storage_params=load_shedding_params )
input_file = "../test/EventFiles/201901-citibike-tripdata-1-fabricated.csv"
output_file = "output_citibike_both-merge-big-0.1latency.txt"

events = CitiBikeDataFrameInputStream(
    input_file,
    timestamp_column="starttime",
)


Creating evaluation manager...
 - Parallel execution: <parallel.ParallelExecutionParameters.DataParallelExecutionParametersHirzelAlgorithm object at 0x119ac7b90>
 - Storage: TreeStorageParameters(sort_storage=True, attributes_priorities={}, clean_up_interval=10, prioritize_sorting_by_timestamp=True, enable_load_shedding=True, load_shedding_threshold=50, load_shedding_drop_rate=0.3, load_shedding_strategy=oldest,latency_bound=0.1)
 - Using ParallelExecutionModes.DATA_PARALLELISM execution mode
!!! Using default evaluation mechanism parameters...
!!! Creating tree-based evaluation mechanism...
Tree calling create_storage_unit with storage_params: TreeStorageParameters(sort_storage=True, attributes_priorities={}, clean_up_interval=10, prioritize_sorting_by_timestamp=True, enable_load_shedding=True, load_shedding_threshold=50, load_shedding_drop_rate=0.3, load_shedding_strategy=oldest,latency_bound=0.1)
InternalNode creating storage: sort=True, sorting_key=None
UnsortedPatternMatchStorage 

In [None]:
monitor.run(
    cep.run,
    events,
    FileOutputStream("../test/demo/Matches/testing", output_file),
    CitiBikeDataFormatter(),
)

Starting CEP evaluation...
Using optimized DataFrame input stream processing
we are updating event count 0
UnsortedPatternMatchStorage.add() called! Total matches: 0
computed currlat None
Found match: {'tripduration': '1408', 'starttime': 2019-01-01 00:31:26.061000, 'stoptime': 2019-01-01 00:54:54.412000, 'startstationid': '327', 'endstationid': '2021', 'bikeid': '15839', 'eventid': 48}
{'tripduration': '138', 'starttime': 2019-01-01 00:54:59.190000, 'stoptime': 2019-01-01 00:57:17.802000, 'startstationid': '2021', 'endstationid': '111111', 'bikeid': '15839', 'eventid': 107}


UnsortedPatternMatchStorage.add() called! Total matches: 0
computed currlat 0.875365
comp latency not none
inside latency computation
Found match: {'tripduration': '700', 'starttime': 2019-01-01 00:37:51.478000, 'stoptime': 2019-01-01 00:49:31.666000, 'startstationid': '490', 'endstationid': '340', 'bikeid': '30705', 'eventid': 60}
{'tripduration': '2531', 'starttime': 2019-01-01 00:49:33.962000, 'stoptime': 2019

Exception in thread Thread-6 (_run):
Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/Users/cricoche/Desktop/aalto_master/SSDM/OpenCEP/OpenCEP/venv/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 772, in run_closure
    _threading_Thread_run(self)
  File "/opt/miniconda3/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/cricoche/Desktop/aalto_master/SSDM/OpenCEP/OpenCEP/parallel/data_parallel/DataParallelExecutionAlgorithm.py", line 119, in _run
    evaluation_manager.eval(events, matches, data_formatter)
  File "/Users/cricoche/Desktop/aalto_master/SSDM/OpenCEP/OpenCEP/parallel/manager/SequentialEvaluationManager.py", line 32, in eval
    self.__eval_mechanism.eval(event_stream, pattern_matches, data_formatter)
  File "/Users/cricoche/Desktop/aalto_master/SSDM/OpenCEP/OpenCEP/tree/evaluation/TreeBasedEvaluationMechanism.py"

we are updating event count 248000
UnsortedPatternMatchStorage.add() called! Total matches: 1
computed currlat 0.4508430999999912
comp latency not none
inside latency computation
Found match: {'tripduration': '104', 'starttime': 2019-01-08 12:53:33.200000, 'stoptime': 2019-01-08 12:55:17.645000, 'startstationid': '326', 'endstationid': '236', 'bikeid': '34654', 'eventid': 246900}
{'tripduration': '356', 'starttime': 2019-01-08 12:55:20.357000, 'stoptime': 2019-01-08 13:01:17.326000, 'startstationid': '236', 'endstationid': '111111', 'bikeid': '34654', 'eventid': 246965}


UnsortedPatternMatchStorage.add() called! Total matches: 3
computed currlat 0.4122501999999824
comp latency not none
inside latency computation
Found match: {'tripduration': '73', 'starttime': 2019-01-08 13:11:45.406000, 'stoptime': 2019-01-08 13:12:58.484000, 'startstationid': '259', 'endstationid': '427', 'bikeid': '33615', 'eventid': 247450}
{'tripduration': '1290', 'starttime': 2019-01-08 13:13:02.735000, 'stoptim