-
Notifications
You must be signed in to change notification settings - Fork 453
/
workload.py
56 lines (42 loc) · 1.58 KB
/
workload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from psycopg import Cursor
from materialize.scalability.endpoint import Endpoint
from materialize.scalability.operation import Operation
from materialize.scalability.operation_data import OperationData
from materialize.scalability.schema import Schema
class Workload:
def init_operations(self) -> list[Operation]:
return []
def operations(self) -> list[Operation]:
raise NotImplementedError
def execute_operation(
self,
operation: Operation,
cursor: Cursor,
worker_id: int,
transaction_index: int,
verbose: bool,
) -> None:
data = OperationData(cursor, worker_id)
self.amend_data_before_execution(data)
if verbose:
print(f"#{transaction_index}: {operation} (worker_id={worker_id})")
operation.execute(data)
def amend_data_before_execution(self, data: OperationData) -> None:
pass
def name(self) -> str:
return self.__class__.__name__
class WorkloadWithContext(Workload):
endpoint: Endpoint
schema: Schema
def set_endpoint(self, endpoint: Endpoint) -> None:
self.endpoint = endpoint
def set_schema(self, schema: Schema) -> None:
self.schema = schema