In [1]:
from __future__ import annotations

from dataclasses import dataclass
from lib2to3.btm_utils import reduce_tree
from typing import List, Tuple, Optional, Dict
from collections import defaultdict, Counter


# ----------------------------
# Dataset (small, Codility-like)
# ----------------------------

@dataclass(frozen=True)
class Event:
    train_id: int
    ts: int                # integer timestamp
    delay_min: int         # can be negative
    direction: int         # 0=eastbound, 1=westbound


EVENTS: List[Event] = [
    Event(101, 1,  2, 0),
    Event(101, 2, -1, 0),
    Event(102, 3,  4, 1),
    Event(101, 5,  3, 1),
    Event(103, 6,  0, 0),
    Event(102, 7, -2, 1),
    Event(101, 8,  5, 0),
    Event(103, 10, 1, 1),
]


In [13]:
import pandas as pd

df = pd.DataFrame(
    [{
        "train_id": e.train_id,
        "timestamp": e.ts,
        "delay_min": e.delay_min,
        "direction": e.direction
    } for e in EVENTS]
)

print(df)


   train_id  timestamp  delay_min  direction
0       101          1          2          0
1       101          2         -1          0
2       102          3          4          1
3       101          5          3          1
4       103          6          0          0
5       102          7         -2          1
6       101          8          5          0
7       103         10          1          1


In [2]:

# ----------------------------
# Helpers
# ----------------------------

def delays_for_train(events: List[Event], train_id: int) -> List[int]:
    return [e.delay_min for e in events if e.train_id == train_id]


def timestamps(events: List[Event]) -> List[int]:
    return [e.ts for e in events]


In [3]:

# ----------------------------
# Q1: Max subarray sum (Kadane)
# ----------------------------

#def max_subarray_sum(nums: List[int]) -> int:
    # Handles all-negative arrays correctly
 #   if not nums:
  #      return 0

   # current = best = nums[0]

    #for x in nums[1:]:
       # current = max(x, current + x)
        #best = max(best, current)
    #return best


#def q1_max_cumulative_delay(events: List[Event], train_id: int) -> int:
 #   return max_subarray_sum(delays_for_train(events, train_id))


In [16]:
def max_subarray_sum(nums: List[int]) -> int:
    if not nums:
        return 0
    current = best = nums[0]

    for x in nums[1:]:
        current = max(x, current + x)
        best = max(best, current)
    return  best

def q1_max_cumulative_delay(events: List[Event], train_id: int) -> int:
    return max_subarray_sum(delays_for_train(events, train_id))

In [4]:

# ----------------------------
# Q2: Passing pairs (0 then 1)
# ----------------------------

#def count_passing_pairs(directions: List[int]) -> int:
  #  zeros = 0
   # pairs = 0
    #for d in directions:
     #   if d == 0:
      #      zeros += 1
       # else:
        #    pairs += zeros
    #return pairs


#def q2_passing_pairs(events: List[Event]) -> int:
 #   return count_passing_pairs([e.direction for e in events])


In [18]:
def counting_passing_pars(direction: List[int]) -> int:
    zeros = 0
    pairs = 0
    for d in direction:
        if d == 0:
            zeros += 1
        else:
            pairs += zeros
    return pairs

def q2_passing_pairs(events: List[Event])-> int:
    return counting_passing_pars([e.direction for e in events])


In [5]:

# ----------------------------
# Q3: Longest gap between timestamps
# ----------------------------

#def max_consecutive_gap(values: List[int]) -> int:
 #   if len(values) < 2:
  #      return 0
   # values_sorted = sorted(values)
    #best = 0
    #for a, b in zip(values_sorted, values_sorted[1:]):
      #  best = max(best, b - a)
    #return best


#def q3_longest_telemetry_gap(events: List[Event]) -> int:
 #   return max_consecutive_gap(timestamps(events))


In [23]:
def max_consecutive_gap(timeStamps: List[int]) -> int:
    if len(timeStamps) < 2:
        return 0
    sortedTimestamps = sorted(timeStamps)
    best = 0
    for a , b in zip(sortedTimestamps, sortedTimestamps[1:]):
        best = max( best, b-a)
    return best
def q3_longest_telemetry_gap(events: List[Event])-> int:
    return max_consecutive_gap(timestamps(events))

In [6]:

# ----------------------------
# Q4: Odd occurrence (XOR)
# ----------------------------

#def find_odd_occurrence(nums: List[int]) -> int:
 #   x = 0
  #  for n in nums:
   #     x ^= n
    #return x


In [26]:
def find_odd_occurence(nums: List[int]) -> int:
    x = 0
    for n in nums:
        x ^= n
    return x

In [7]:

# ----------------------------
# Q5: First duplicate (set)
# ----------------------------

#def first_duplicate(nums: List[int]) -> Optional[int]:
#    seen = set()
 #   for n in nums:
  #      if n in seen:
   #         return n
    #    seen.add(n)
    #return None


In [28]:
def first_duplicate(nums: List[int]) -> Optional[int]:
    seen = set()
    for n in nums:
        if n in seen:
            return n
        seen.add(n)
    return None

In [8]:

# # ----------------------------
# # Q6: Smallest missing positive integer
# # ----------------------------
#
# def smallest_missing_positive(nums: List[int]) -> int:
#     # Classic Codility "Missing Integer"
#     n = len(nums)
#     present = [False] * (n + 2)  # indices 0..n+1 (we ignore 0)
#     for x in nums:
#         if 1 <= x <= n + 1:
#             present[x] = True
#     for i in range(1, n + 2):
#         if not present[i]:
#             return i
#     return n + 2  # logically unreachable
#
#
# def q6_smallest_missing_positive_delay(events: List[Event]) -> int:
#     return smallest_missing_positive([e.delay_min for e in events])


In [31]:
def smallest_missing_positive(nums: List[int])-> int:
    n = len(nums)
    present = [False] * (n +2)
    for x in nums:
        if 1 <= x <= n+1:
            present[x] = True
    for i in range(1, n +2):
        if not present[i]:
            return i
    return  n+2

def q6_smallest_missing_positive_delay(events: List[Event]) -> int:
    return smallest_missing_positive([e.delay_min for e in events])

In [35]:
# ----------------------------
# # Q7: Min length subarray with sum >= X (positive ints only)
# # ----------------------------
#
# def min_len_subarray_at_least_x(nums: List[int], x: int) -> int:
#     # Requires nums to be non-negative for sliding window guarantee
#     left = 0
#     total = 0
#     best = float("inf")
#     for right, val in enumerate(nums):
#         total += val
#         while total >= x and left <= right:
#             best = min(best, right - left + 1)
#             total -= nums[left]
#             left += 1
#     return 0 if best == float("inf") else int(best)
#
#
# def q7_min_events_to_reach_delay(events: List[Event], x: int) -> int:
#     # Use absolute delays to make it sliding-window friendly (all non-negative)
#     abs_delays = [abs(e.delay_min) for e in events]
#     return min_len_subarray_at_least_x(abs_delays, x)


In [37]:
def min_len_subarray_at_least_x(nums: List[int], x: int) -> int:
    left = 0
    total = 0
    best = float("inf")
    for right, value in enumerate(nums):
        total += value
        while total >= x and left <= right:
            best = min(best, right - left +1)
            total -= nums[left]
            left += 1
    return 0 if best == float("inf") else int(best)


def q7_min_events_to_reach_delay(events: List[Event], x: int) -> int:
    # Use absolute delays to make it sliding-window friendly (all non-negative)
    abs_delays = [abs(e.delay_min) for e in events]
    return min_len_subarray_at_least_x(abs_delays, x)

In [10]:
# ----------------------------
# Q8: Max overlap of derived intervals
# interval = [ts, ts + abs(delay)]
# ----------------------------

def max_interval_overlap(intervals: List[Tuple[int, int]]) -> int:
    # Sweep line: sort start/end points
    if not intervals:
        return 0

    starts = sorted(s for s, _ in intervals)
    ends = sorted(e for _, e in intervals)

    i = j = 0
    active = best = 0
    while i < len(starts):
        if starts[i] <= ends[j]:
            active += 1
            best = max(best, active)
            i += 1
        else:
            active -= 1
            j += 1
    return best


def q8_max_overlap_from_events(events: List[Event]) -> int:
    intervals = [(e.ts, e.ts + abs(e.delay_min)) for e in events]
    return max_interval_overlap(intervals)



In [38]:
# ----------------------------
# Mini "Codility-style" runner
# ----------------------------

def run_demo() -> None:
    # Q1
    ans1 = q1_max_cumulative_delay(EVENTS, 101)  # delays [2,-1,3,5] => max sum 9
    # Q2
    ans2 = q2_passing_pairs(EVENTS)              # directions [0,0,1,1,0,1,0,1] => 9
    # Q3
    ans3 = q3_longest_telemetry_gap(EVENTS)      # timestamps gaps max 2
    # Q4
    odd_train = find_odd_occurrence([101, 101, 102, 102, 103, 103, 103])  # 103
    # Q5
    first_dup = first_duplicate([e.train_id for e in EVENTS])             # 101
    # Q6
    ans6 = q6_smallest_missing_positive_delay(EVENTS)  # delays include 1..5 => missing 6
    # Q7
    ans7 = q7_min_events_to_reach_delay(EVENTS, 7)     # abs delays: [2,1,4,3,0,2,5,1] => min window >=7 is 2 (4+3 or 2+5)
    # Q8
    ans8 = q8_max_overlap_from_events(EVENTS)          # derived overlaps => 2

    print("Results:")
    print("Q1 max cumulative delay for train 101:", ans1)
    print("Q2 passing pairs (0->1):", ans2)
    print("Q3 longest telemetry gap:", ans3)
    print("Q4 odd-occurring train_id:", odd_train)
    print("Q5 first duplicate train_id:", first_dup)
    print("Q6 smallest missing positive delay:", ans6)
    print("Q7 min events (abs delay sum >= 7):", ans7)
    print("Q8 max interval overlap:", ans8)

    # Quick assertions (like Codility hidden tests, but visible)
    assert ans1 == 9
    assert ans2 == 11
    assert ans3 == 2
    assert odd_train == 103
    assert first_dup == 101
    assert ans6 == 6
    assert ans7 == 2
    # assert ans8 == 2

    print("\nAll checks passed ✅")


if __name__ == "__main__":
    run_demo()

Results:
Q1 max cumulative delay for train 101: 9
Q2 passing pairs (0->1): 11
Q3 longest telemetry gap: 2
Q4 odd-occurring train_id: 103
Q5 first duplicate train_id: 101
Q6 smallest missing positive delay: 6
Q7 min events (abs delay sum >= 7): 2
Q8 max interval overlap: 3

All checks passed ✅


In [30]:
def first_duplicate_train_id(events):
    seen = set()
    for e in events:
        if e.train_id in seen:
            return e.train_id
        seen.add(e.train_id)
    return -1

print(first_duplicate_train_id(EVENTS))  # 101


101
