|
| 1 | +""" |
| 2 | +Originally from aw-research |
| 3 | +""" |
| 4 | + |
| 5 | +from copy import deepcopy |
| 6 | +from typing import List, Tuple, Optional |
| 7 | +from datetime import datetime, timedelta, timezone |
| 8 | + |
| 9 | +from timeslot import Timeslot |
| 10 | + |
| 11 | +from aw_core import Event |
| 12 | + |
| 13 | + |
| 14 | +def _split_event(e: Event, dt: datetime) -> Tuple[Event, Optional[Event]]: |
| 15 | + if e.timestamp < dt < e.timestamp + e.duration: |
| 16 | + e1 = deepcopy(e) |
| 17 | + e2 = deepcopy(e) |
| 18 | + e1.duration = dt - e.timestamp |
| 19 | + e2.timestamp = dt |
| 20 | + e2.duration = (e.timestamp + e.duration) - dt |
| 21 | + return (e1, e2) |
| 22 | + else: |
| 23 | + return (e, None) |
| 24 | + |
| 25 | + |
| 26 | +def test_split_event(): |
| 27 | + now = datetime(2018, 1, 1, 0, 0).astimezone(timezone.utc) |
| 28 | + td1h = timedelta(hours=1) |
| 29 | + e = Event(timestamp=now, duration=2 * td1h, data={}) |
| 30 | + e1, e2 = _split_event(e, now + td1h) |
| 31 | + assert e1.timestamp == now |
| 32 | + assert e1.duration == td1h |
| 33 | + assert e2.timestamp == now + td1h |
| 34 | + assert e2.duration == td1h |
| 35 | + |
| 36 | + |
| 37 | +def union_no_overlap(events1: List[Event], events2: List[Event]) -> List[Event]: |
| 38 | + """Merges two eventlists and removes overlap, the first eventlist will have precedence |
| 39 | +
|
| 40 | + Example: |
| 41 | + events1 | xxx xx xxx | |
| 42 | + events1 | ---- ------ -- | |
| 43 | + result | xxx-- xx ----xxx -- | |
| 44 | + """ |
| 45 | + events1 = deepcopy(events1) |
| 46 | + events2 = deepcopy(events2) |
| 47 | + |
| 48 | + # I looked a lot at aw_transform.union when I wrote this |
| 49 | + events_union = [] |
| 50 | + e1_i = 0 |
| 51 | + e2_i = 0 |
| 52 | + while e1_i < len(events1) and e2_i < len(events2): |
| 53 | + e1 = events1[e1_i] |
| 54 | + e2 = events2[e2_i] |
| 55 | + e1_p = Timeslot(e1.timestamp, e1.timestamp + e1.duration) |
| 56 | + e2_p = Timeslot(e2.timestamp, e2.timestamp + e2.duration) |
| 57 | + |
| 58 | + if e1_p.intersects(e2_p): |
| 59 | + if e1.timestamp <= e2.timestamp: |
| 60 | + events_union.append(e1) |
| 61 | + e1_i += 1 |
| 62 | + |
| 63 | + # If e2 continues after e1, we need to split up the event so we only get the part that comes after |
| 64 | + _, e2_next = _split_event(e2, e1.timestamp + e1.duration) |
| 65 | + if e2_next: |
| 66 | + events2[e2_i] = e2_next |
| 67 | + else: |
| 68 | + e2_i += 1 |
| 69 | + else: |
| 70 | + e2_next, e2_next2 = _split_event(e2, e1.timestamp) |
| 71 | + events_union.append(e2_next) |
| 72 | + e2_i += 1 |
| 73 | + if e2_next2: |
| 74 | + events2.insert(e2_i, e2_next2) |
| 75 | + else: |
| 76 | + if e1.timestamp <= e2.timestamp: |
| 77 | + events_union.append(e1) |
| 78 | + e1_i += 1 |
| 79 | + else: |
| 80 | + events_union.append(e2) |
| 81 | + e2_i += 1 |
| 82 | + events_union += events1[e1_i:] |
| 83 | + events_union += events2[e2_i:] |
| 84 | + return events_union |
| 85 | + |
| 86 | + |
| 87 | +def test_union_no_overlap(): |
| 88 | + from pprint import pprint |
| 89 | + |
| 90 | + now = datetime(2018, 1, 1, 0, 0) |
| 91 | + td1h = timedelta(hours=1) |
| 92 | + events1 = [ |
| 93 | + Event(timestamp=now + 2 * i * td1h, duration=td1h, data={"test": 1}) |
| 94 | + for i in range(3) |
| 95 | + ] |
| 96 | + events2 = [ |
| 97 | + Event(timestamp=now + (2 * i + 0.5) * td1h, duration=td1h, data={"test": 2}) |
| 98 | + for i in range(3) |
| 99 | + ] |
| 100 | + |
| 101 | + events_union = _union_no_overlap(events1, events2) |
| 102 | + # pprint(events_union) |
| 103 | + dur = sum((e.duration for e in events_union), timedelta(0)) |
| 104 | + assert dur == timedelta(hours=4, minutes=30) |
| 105 | + assert sorted(events_union, key=lambda e: e.timestamp) |
| 106 | + |
| 107 | + events_union = _union_no_overlap(events2, events1) |
| 108 | + # pprint(events_union) |
| 109 | + dur = sum((e.duration for e in events_union), timedelta(0)) |
| 110 | + assert dur == timedelta(hours=4, minutes=30) |
| 111 | + assert sorted(events_union, key=lambda e: e.timestamp) |
| 112 | + |
| 113 | + events1 = [ |
| 114 | + Event(timestamp=now + (2 * i) * td1h, duration=td1h, data={"test": 1}) |
| 115 | + for i in range(3) |
| 116 | + ] |
| 117 | + events2 = [Event(timestamp=now, duration=5 * td1h, data={"test": 2})] |
| 118 | + events_union = _union_no_overlap(events1, events2) |
| 119 | + pprint(events_union) |
| 120 | + dur = sum((e.duration for e in events_union), timedelta(0)) |
| 121 | + assert dur == timedelta(hours=5, minutes=0) |
| 122 | + assert sorted(events_union, key=lambda e: e.timestamp) |
0 commit comments