Skip to content

Commit

Permalink
fix: refactored flood to better behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
ErikBjare committed Sep 2, 2021
1 parent efd1325 commit 0b47c01
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 43 deletions.
149 changes: 107 additions & 42 deletions aw_transform/flood.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,107 @@
logger = logging.getLogger(__name__)


def _flood(e1: Event, e2: Event):
"""Floods the larger event over the smaller event, in-place"""
# Prioritize flooding from the longer event
# NOTE: Perhaps better to flood from the former event?
e2_end = e2.timestamp + e2.duration
if e1.duration >= e2.duration:
if e1.data == e2.data:
# Extend e1 to the end of e2
# Set duration of e2 to zero (mark to delete)
e1.duration = e2_end - e1.timestamp
e2.timestamp = e2_end
e2.duration = timedelta(0)
else:
# Extend e1 to the start of e2
e1.duration = e2.timestamp - e1.timestamp
else:
if e1.data == e2.data:
# Extend e2 to the start of e1, discard e1
e2.timestamp = e1.timestamp
e2.duration = e2_end - e2.timestamp
e1.duration = timedelta(0)
else:
# Extend e2 backwards to end of e1
e2.timestamp = e1.timestamp + e1.duration
e2.duration = e2_end - e2.timestamp


def _flood_first(e1: Event, e2: Event):
"""Floods the larger event over the smaller event, in-place"""
# Prioritize flooding from the longer event
# NOTE: Perhaps better to flood from the former event?
e2_end = e2.timestamp + e2.duration
if e1.duration >= e2.duration:
if e1.data == e2.data:
# Extend e1 to the end of e2
# Set duration of e2 to zero (mark to delete)
e1.duration = e2_end - e1.timestamp
e2.timestamp = e2_end
e2.duration = timedelta(0)
else:
# Extend e1 to the start of e2
e1.duration = e2.timestamp - e1.timestamp
else:
if e1.data == e2.data:
# Extend e2 to the start of e1, discard e1
e2.timestamp = e1.timestamp
e2.duration = e2_end - e2.timestamp
e1.duration = timedelta(0)
else:
# Extend e2 backwards to end of e1
e2.timestamp = e1.timestamp + e1.duration
e2.duration = e2_end - e2.timestamp


def _trim(e1: Event, e2: Event):
"""Trims the part of a smaller event covered by a larger event, in-place"""
e1_end = e1.timestamp + e1.duration
e2_end = e2.timestamp + e2.duration

if e1.duration > e2.duration:
# Trim e2 to remove overlap
e2.timestamp = e1_end
e2.duration = e2_end - e1_end
else:
# Trim e1 to remove overlap
e1.duration = e2.timestamp - e1.timestamp


def flood(events: List[Event], pulsetime: float = 5) -> List[Event]:
"""
Takes a list of events and "floods" any empty space between events by extending one of the surrounding events to cover the empty space.
Floods event to the nearest neighbouring event if within the specified ``pulsetime``.
Takes a list of ``events`` and "floods" empty space between events smaller than ``pulsetime``, by extending one of the surrounding events to cover the empty space.
Also merges events if they have the same data and are within the pulsetime
Originally written in aw-research: https://github.com/ActivityWatch/aw-analysis/blob/7da1f2cd8552f866f643501de633d74cdecab168/aw_analysis/flood.py
Also implemented in aw-server-rust: https://github.com/ActivityWatch/aw-server-rust/blob/master/aw-transform/src/flood.rs
# Example
```ignore
pulsetime: 1 second (one space)
input: [a] [a] [b][b] [b][c]
output: [a ][b ] [b][c]
```
For more details on flooding, see this issue:
- https://github.com/ActivityWatch/activitywatch/issues/124
NOTE: This algorithm has a lot of smaller details that need to be
carefully considered by anyone wishing to edit it, see:
- https://github.com/ActivityWatch/aw-core/pull/73
"""
# Originally written in aw-research: https://github.com/ActivityWatch/aw-analysis/blob/7da1f2cd8552f866f643501de633d74cdecab168/aw_analysis/flood.py
# NOTE: This algorithm has a lot of smaller details that need to be
# carefully considered by anyone wishing to edit it, see:
# - https://github.com/ActivityWatch/aw-core/pull/73

events = deepcopy(events)
events = sorted(events, key=lambda e: e.timestamp)
events = sorted(events, key=lambda e: (e.timestamp, e.duration))

# If negative gaps are smaller than this, prune them to become zero
negative_gap_trim_thres = timedelta(seconds=0.1)
gap_trim_thres = timedelta(seconds=0.1)

warned_about_negative_gap_safe = False
warned_about_negative_gap_unsafe = False
Expand All @@ -44,45 +128,26 @@ def flood(events: List[Event], pulsetime: float = 5) -> List[Event]:
e2.timestamp, e2.duration = end, timedelta(0)
if not warned_about_negative_gap_safe:
logger.warning(
"Gap was of negative duration but could be safely merged ({}s). This message will only show once per batch.".format(
gap.total_seconds()
)
f"Gap was negative but could be safely merged ({gap.total_seconds()}s). Will only warn once per batch." # {e1.data}
)
logger.debug(f"{e1.data}")
warned_about_negative_gap_safe = True
elif gap < -negative_gap_trim_thres and not warned_about_negative_gap_unsafe:
elif gap < -gap_trim_thres:
# Events with negative gap but differing data cannot be merged safely
logger.warning(
"Gap was of negative duration and could NOT be safely merged ({}s). This warning will only show once per batch.".format(
gap.total_seconds()
# We still need to get rid of the gap however, so we will trim the smaller event.
# TODO: This might be a bad idea, could lead to a huge chunk of non-AFK time getting whacked, or vice versa.
_trim(e1, e2)

if not warned_about_negative_gap_unsafe:
logger.warning(
f"Gap was negative and could NOT be safely merged ({gap.total_seconds()}s). Will only warn once per batch."
)
)
warned_about_negative_gap_unsafe = True
# logger.warning("Event 1 (id {}): {} {}".format(e1.id, e1.timestamp, e1.duration))
# logger.warning("Event 2 (id {}): {} {}".format(e2.id, e2.timestamp, e2.duration))
elif -negative_gap_trim_thres < gap <= timedelta(seconds=pulsetime):
e2_end = e2.timestamp + e2.duration

# Prioritize flooding from the longer event
if e1.duration >= e2.duration:
if e1.data == e2.data:
# Extend e1 to the end of e2
# Set duration of e2 to zero (mark to delete)
e1.duration = e2_end - e1.timestamp
e2.timestamp = e2_end
e2.duration = timedelta(0)
else:
# Extend e1 to the start of e2
e1.duration = e2.timestamp - e1.timestamp
else:
if e1.data == e2.data:
# Extend e2 to the start of e1, discard e1
e2.timestamp = e1.timestamp
e2.duration = e2_end - e2.timestamp
e1.duration = timedelta(0)
else:
# Extend e2 backwards to end of e1
e2.timestamp = e1.timestamp + e1.duration
e2.duration = e2_end - e2.timestamp
logger.debug(f"{e1.data} != {e2.data}")
warned_about_negative_gap_unsafe = True
# logger.warning("Event 1 (id {}): {} {}".format(e1.id, e1.timestamp, e1.duration))
# logger.warning("Event 2 (id {}): {} {}".format(e2.id, e2.timestamp, e2.duration))
elif -gap_trim_thres < gap <= timedelta(seconds=pulsetime):
_flood(e1, e2)

# Filter out remaining zero-duration events
events = [e for e in events if e.duration > timedelta(0)]
Expand Down
31 changes: 30 additions & 1 deletion tests/test_flood.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def test_flood_negative_gap_differing_data():
Event(timestamp=now, duration=100, data={"b": 1}),
]
flooded = flood(events)
assert flooded == events
assert flooded == [events[1]]


def test_flood_negative_small_gap_differing_data():
Expand All @@ -78,3 +78,32 @@ def test_flood_negative_small_gap_differing_data():
flooded = flood(events)
duration = sum((e.duration for e in flooded), timedelta(0))
assert duration == timedelta(seconds=100 + 99.99)


def test_flood_idempotent():
events = [
# slight overlap, same data
Event(timestamp=now, duration=10, data={"a": 0}),
Event(timestamp=now + 9 * td1s, duration=5, data={"a": 0}),
# different data, no overlap
Event(timestamp=now + 15 * td1s, duration=5, data={"b": 0}),
]
flood_first = flood(events, pulsetime=0)
flooded = flood_first
for i in range(2):
flooded = flood(flooded, pulsetime=0)
assert flood_first == flooded

assert sum((e.duration for e in flooded), timedelta(0)) == 19 * td1s


def test_flood_unsafe_gap():
events = [
# slight overlap, different data
Event(timestamp=now, duration=10, data={"a": 0}),
Event(timestamp=now + 9 * td1s, duration=5, data={"b": 0}),
]
flooded = flood(events, pulsetime=0)

# The total duration should not exceed the range duration
assert sum((e.duration for e in flooded), timedelta(0)) == 14 * td1s

0 comments on commit 0b47c01

Please sign in to comment.