-
Notifications
You must be signed in to change notification settings - Fork 2
/
resource_unavailability.py
158 lines (130 loc) · 6.71 KB
/
resource_unavailability.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import datetime
from typing import List, Optional
import pandas as pd
from wta import log_ids_non_nil, EventLogIDs
from wta.calendars import calendars
from wta.calendars.calendars import UNDIFFERENTIATED_RESOURCE_POOL_KEY
from wta.calendars.intervals import pd_interval_to_interval, Interval, subtract_intervals
def other_processing_events_during_waiting_time_of_event(
event_index: pd.Index,
log: pd.DataFrame,
log_ids: Optional[EventLogIDs] = None) -> pd.DataFrame:
"""
Returns a dataframe with all other processing events that are in the waiting time of the given event, i.e.,
activities that have been started before event_start_time but after event_enabled_time.
:param event_index: Index of the event for which the waiting time is taken into account.
:param log: Log dataframe.
:param log_ids: Event log IDs.
"""
log_ids = log_ids_non_nil(log_ids)
event = log.loc[event_index]
if isinstance(event, pd.Series):
event = event.to_frame().T
# current event variables
event_start_time = event[log_ids.start_time].values[0]
event_start_time = pd.to_datetime(event_start_time, utc=True)
event_enabled_time = event[log_ids.enabled_time].values[0]
event_enabled_time = pd.to_datetime(event_enabled_time, utc=True)
resource = event[log_ids.resource].values[0]
# resource events throughout the event log except the current event
resource_events = log[log[log_ids.resource] == resource]
resource_events = resource_events.loc[resource_events.index.difference(event_index)]
# taking activities that resource started before event_start_time but after event_enabled_time
other_processing_events = resource_events[
(resource_events[log_ids.start_time] < event_start_time) &
(resource_events[log_ids.end_time] > event_enabled_time)]
return other_processing_events
def non_processing_intervals(
event_index: pd.Index,
log: pd.DataFrame,
log_ids: Optional[EventLogIDs] = None) -> List[Interval]:
"""
Returns a list of intervals during which no processing has taken place.
:param event_index: Index of the event for which the waiting time is taken into account.
:param log: Log dataframe.
:param log_ids: Event log IDs.
"""
log_ids = log_ids_non_nil(log_ids)
event = log.loc[event_index]
if isinstance(event, pd.Series):
event = event.to_frame().T
# current event variables
event_start_time = event[log_ids.start_time].values[0]
event_start_time = pd.to_datetime(event_start_time, utc=True)
event_enabled_time = event[log_ids.enabled_time].values[0]
event_enabled_time = pd.to_datetime(event_enabled_time, utc=True)
wt_interval = pd.Interval(event_enabled_time, event_start_time)
wt_interval = pd_interval_to_interval(wt_interval)
other_processing_events = other_processing_events_during_waiting_time_of_event(event_index, log, log_ids=log_ids)
if len(other_processing_events) == 0:
return wt_interval
other_processing_events_intervals = []
for (_, event) in other_processing_events.iterrows():
pd_interval = pd.Interval(event[log_ids.start_time], event[log_ids.end_time])
interval = pd_interval_to_interval(pd_interval)
other_processing_events_intervals.extend(interval)
result = subtract_intervals(wt_interval, other_processing_events_intervals)
return result
def detect_unavailability_intervals(
event_index: pd.Index,
log: pd.DataFrame,
log_calendar: dict,
differentiated=True,
log_ids: Optional[EventLogIDs] = None) -> List[Interval]:
log_ids = log_ids_non_nil(log_ids)
event = log.loc[event_index]
if isinstance(event, pd.Series):
event = event.to_frame().T
if differentiated:
resource = event[log_ids.resource].values[0]
else:
resource = UNDIFFERENTIATED_RESOURCE_POOL_KEY
start_time = pd.Timestamp(event[log_ids.start_time].values[0])
enabled_time = pd.Timestamp(event[log_ids.enabled_time].values[0])
# TODO: do tz localization somewhere earlier (preprocessing module) on the whole log
start_time = __ensure_timestamp_tz(start_time, enabled_time.tz)
enabled_time = __ensure_timestamp_tz(enabled_time, start_time.tz)
non_working_intervals = []
if enabled_time < start_time:
overall_work_intervals = calendars.resource_working_hours_as_intervals(resource, log_calendar)
current_instant = enabled_time
while current_instant < start_time:
next_instant = None
daily_working_intervals = [
interval
for interval in overall_work_intervals
if current_instant.weekday() == interval.left_day.value
]
if len(daily_working_intervals) > 0:
# Search for an interval containing the current instant
for working_interval in daily_working_intervals:
start = working_interval.left_time_to_time()
end = working_interval.right_time_to_time()
if start <= current_instant.time() < end:
next_instant = pd.Timestamp.combine(current_instant.date(), end).tz_localize(current_instant.tz)
# If not contained in an interval
if next_instant is None:
# Get intervals' start happening after it
starts_after = [
working_interval.left_time_to_time()
for working_interval in daily_working_intervals
if working_interval.left_time_to_time() > current_instant.time()
]
if len(starts_after) > 0:
next_instant = pd.Timestamp.combine(current_instant.date(), min(starts_after)).tz_localize(current_instant.tz)
non_working_intervals += [pd.Interval(current_instant, min(next_instant, start_time))]
if next_instant is None:
# Non working periods on this week day, or no working periods happening after current instant,
# thus, add non-working interval until end of day
next_instant = pd.Timestamp.combine(
current_instant.date() + pd.Timedelta(days=1),
datetime.time.fromisoformat("00:00:00.000000")
).tz_localize(current_instant.tz)
non_working_intervals += [pd.Interval(current_instant, min(next_instant, start_time))]
current_instant = next_instant
return non_working_intervals
def __ensure_timestamp_tz(timestamp: pd.Timestamp, tz: Optional[str] = None):
if not timestamp.tz:
tz = tz if tz else 'UTC'
timestamp = timestamp.tz_localize(tz)
return timestamp