-
Notifications
You must be signed in to change notification settings - Fork 2
/
process.py
237 lines (193 loc) · 10.2 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
""" sqlalchemy model for processes and related functions.
A process row is a certain experiment with multiple versions of that process and further metadata.
"""
from datetime import datetime
from typing import Optional
from sqlalchemy import and_
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import ARRAY
from models import db, batch_policy
from models.process_instance import unevaluated_instances_still_exist, ProcessInstance
from models.utils import CASCADING_DELETE, Version, WinningReasonEnum, ExperimentState
class Process(db.Model):
""" sqlalchemy model for processes/experiments """
__tablename__ = "process"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
active = db.Column(db.Boolean, nullable=False, default=True)
datetime_added = db.Column(db.DateTime, nullable=False, default=datetime.now())
variant_a_path = db.Column(db.String, nullable=False)
variant_b_path = db.Column(db.String, nullable=False)
variant_a_camunda_id = db.Column(db.String, nullable=False)
variant_b_camunda_id = db.Column(db.String, nullable=False)
default_version = db.Column(db.Enum(Version), nullable=False)
quantiles_default_history = db.Column(ARRAY(db.Float), nullable=False) # in seconds
interarrival_default_history = db.Column(db.Float, nullable=False)
in_cool_off = db.Column(db.Boolean, nullable=False, default=False)
winning_reason = db.Column(db.Enum(WinningReasonEnum), nullable=True)
datetime_decided = db.Column(db.DateTime, nullable=True)
batch_policies = relationship("BatchPolicy", cascade=CASCADING_DELETE)
customer_categories = relationship("CustomerCategory", cascade=CASCADING_DELETE, lazy="dynamic")
process_instances = relationship("ProcessInstance", cascade=CASCADING_DELETE)
batch_policy_proposals = relationship('BatchPolicyProposal', cascade=CASCADING_DELETE)
class CustomerCategory(db.Model):
""" sqlalchemy model for customer categories of a process """
__tablename__ = "customer_category"
id = db.Column(db.Integer, primary_key=True)
process_id = db.Column(db.Integer, db.ForeignKey('process.id'))
name = db.Column(db.String(100), nullable=False)
winning_version = db.Column(db.Enum(Version), nullable=True) # Will be set after learning is done
def get_active_process_id() -> int:
"""Get backend id of currently active process.
:return: process id
"""
active_process_entry_query = db.session.query(Process).filter(Process.active.is_(True))
assert active_process_entry_query.count() == 1, "Active processes != 1: " + str(active_process_entry_query.count())
active_process_entry_id = active_process_entry_query.first().id
return active_process_entry_id
def get_process_entry(process_id: int) -> Process:
"""Get process entry
:raises AssertionError: When illegal state: Active processes != 1
:param process_id: specify process
:return: Process entry
"""
active_process_entry_query = db.session.query(Process).filter(Process.id == process_id)
assert active_process_entry_query.count() == 1, "Active processes != 1: " + str(active_process_entry_query.count())
return active_process_entry_query.first()
def set_winning(process_id: int, decision: list[dict[str, Version]], winning_reason: WinningReasonEnum) -> None:
""" Finish an experiment and set the winning versions/decision for a process, as well as a winning reason
:raises RuntimeError: process already has winning version
:raises RuntimeError: version-decision query parameter must be 'a' or 'b'
:param process_id: process id in backend
:param decision: A list containing dicts with the decisions for each customer category. Dict format:
{'customer_category': str, 'winning_version': Version.A or Version.B}
:param winning_reason: reason for decision
"""
if is_decision_made(process_id):
raise RuntimeError("Winning decision already set")
relevant_process = get_process_entry(process_id)
relevant_customer_categories = relevant_process.customer_categories
# check whether handed over decision dict is valid
if not relevant_customer_categories.count() == len(decision):
raise RuntimeError("Decisions not provided for all customer categories")
category_names = []
for part_decision in decision:
if part_decision['winning_version'] not in [Version.A, Version.B]:
raise RuntimeError("winning_version must be Version.A or Version.B")
category_names.append(part_decision['customer_category'])
category_names.sort()
if category_names != get_sorted_customer_category_list(process_id):
raise RuntimeError("customer_category invalid for process")
for part_decision in decision:
# set winning version for customer category
customer_category = relevant_customer_categories.\
filter(CustomerCategory.name == part_decision['customer_category']).first()
customer_category.winning_version = part_decision['winning_version']
relevant_process.winning_reason = winning_reason
relevant_process.datetime_decided = datetime.now()
db.session.commit()
def is_valid_customer_category(process_id: int, customer_category: str) -> bool:
"""Checks whether a customer_category string is part of the customer categories of a certain process.
:param process_id: process to be checked
:param customer_category: category to be checked
:return: True or False
"""
relevant_process = get_process_entry(process_id)
customer_categories = relevant_process.customer_categories
return any(category.name == customer_category for category in customer_categories)
def get_sorted_customer_category_list(process_id: int) -> list:
"""Get a list of the customer categories of a process.
:param process_id: specify process/experiment
:return: sorted (alphabetically) list of customer categories
"""
relevant_process = get_process_entry(process_id)
customer_categories = relevant_process.customer_categories
cat_names = []
for category in customer_categories:
cat_names.append(category.name)
cat_names.sort()
return cat_names
def is_decision_made(process_id: int) -> bool:
"""Check whether a decision has already been made for a certain process
:raises RuntimeError: Illegal internal state: When only some of the customer categories have winning versions
:param process_id: specify process
:return: True or False
"""
relevant_process = get_process_entry(process_id)
customer_categories = relevant_process.customer_categories
if all(category.winning_version is None for category in customer_categories):
return False
if all(category.winning_version is not None for category in customer_categories):
return True
raise RuntimeError("Either all, or none of the customer categories of a process should have a winning version")
def get_winning(process_id: int) -> Optional[list[Version]]:
"""Get a dict of the winning process versions per customer category.
Format of dict:
[{
'customer_category': str,
'winning_version': Version.A or Version.B
}]
:param process_id: specify process
:return: list with winning version for each customer category or None, when no winning version yet
"""
if not is_decision_made(process_id):
winning_versions = None
else:
relevant_process_entry = get_process_entry(process_id)
winning_versions = []
for customer_category in relevant_process_entry.customer_categories:
winning_versions.append({"customer_category": customer_category.name,
"winning_version": customer_category.winning_version})
return winning_versions
def in_cool_off(process_id: int) -> bool:
"""Checks whether a certain process is in cool-off period.
:param process_id: specify process
:return: True or False
"""
return Process.query.filter(Process.id == process_id).first().in_cool_off
def cool_off_over(process_id: int) -> bool:
"""Checks whether a certain process is in cool-off period AND all instances have been evaluated.
:param process_id: specify process
:return: True or False
"""
return in_cool_off(process_id) and not unevaluated_instances_still_exist(process_id)
def is_in_batch(process_id: int) -> bool:
"""Check whether certain process experiment currently is in experimental batch.
:param process_id: process id
:return: True or False
"""
return ProcessInstance.query.filter(and_(ProcessInstance.process_id == process_id,
ProcessInstance.do_evaluate.is_(True))).count() < \
batch_policy.get_batch_size_sum(process_id)
def get_experiment_state_str(process_id: int) -> str:
"""Get the current state of the experiment for a certain process.
:raises RuntimeError: Illegal internal state: Decision made without winning reason
:param process_id: Process id
:return: State of process experiment
"""
process = get_process_entry(process_id)
exp_state_enum = get_experiment_state_enum(process_id)
if process.winning_reason is not None:
assert exp_state_enum == ExperimentState.FINISHED
return exp_state_enum.value + ", " + process.winning_reason.value
return exp_state_enum.value
def get_experiment_state_enum(process_id: int) -> ExperimentState:
"""Get the current state of the experiment for a certain process.
:raises RuntimeError: Illegal internal state: Decision made without winning reason
:param process_id: Process id
:return: State of process experiment
"""
process = get_process_entry(process_id)
if is_decision_made(process_id) is False:
if cool_off_over(process.id):
return ExperimentState.COOL_OFF_FIN_DEC_OUTSTANDING
if process.in_cool_off:
return ExperimentState.IN_COOL_OFF
if is_in_batch(process_id):
return ExperimentState.RUNNING_IN_BATCH
if batch_policy.get_latest_bapol_entry(process_id) is None:
return ExperimentState.RUNNING_BEFORE_FIRST_BATCH_POL
return ExperimentState.RUNNING_OUTSIDE_BATCH
if process.winning_reason is None:
raise RuntimeError("Decision made without winning reason.")
return ExperimentState.FINISHED