# Day 15: Beacon Exclusion Zone

## Part 1

In [None]:
from aoc_2023 import core


_example = """Sensor at x=2, y=18: closest beacon is at x=-2, y=15
Sensor at x=9, y=16: closest beacon is at x=10, y=16
Sensor at x=13, y=2: closest beacon is at x=15, y=3
Sensor at x=12, y=14: closest beacon is at x=10, y=16
Sensor at x=10, y=20: closest beacon is at x=10, y=16
Sensor at x=14, y=17: closest beacon is at x=10, y=16
Sensor at x=8, y=7: closest beacon is at x=2, y=10
Sensor at x=2, y=0: closest beacon is at x=2, y=10
Sensor at x=0, y=11: closest beacon is at x=2, y=10
Sensor at x=20, y=14: closest beacon is at x=25, y=17
Sensor at x=17, y=20: closest beacon is at x=21, y=22
Sensor at x=16, y=7: closest beacon is at x=15, y=3
Sensor at x=14, y=3: closest beacon is at x=15, y=3
Sensor at x=20, y=1: closest beacon is at x=15, y=3"""
_test = core.read_input("../data/day_15.txt")

In [None]:
import re

from dataclasses import dataclass


@dataclass(frozen=True)
class Position:
    x: int
    y: int

        
def manhattan_distance(a: Position, b: Position):
    return abs(a.x - b.x) + abs(a.y - b.y)
        

def intersection(sensor: Position, beacon: Position, y: int) -> tuple[int, int] | None:
    distance = manhattan_distance(sensor, beacon)
    if sensor.y - distance <= y <= sensor.y + distance:
        dx = distance - abs(y - sensor.y)
        return tuple(sorted([sensor.x - dx, sensor.x + dx]))
    return None


def parse(s: str) -> list[tuple[Position, Position]]:
    acc = []
    for line in s.split("\n"):
        if not line:
            continue
        match = re.search(r".*x=(-?\d+), y=(-?\d+).*x=(-?\d+), y=(-?\d+)", line)
        acc += [(Position(int(match[1]), int(match[2])), Position(int(match[3]), int(match[4])))]
    return acc

In [None]:
import functools


def intersections(pairs: list[tuple[Position, Position]], a: int, b: int) -> list[list[tuple[int, int]]]:
    acc = []    
    intervals = [
        sorted([
            interval for sensor, beacon in pairs
            if (interval := intersection(sensor, beacon, y))
        ])
        for y in range(a, b + 1)
    ]
    
    def reducer(acc, value):
        if not acc:
            return [value]
        if acc[-1][1] >= value[0] - 1:
            return acc[:-1] + [(min(acc[-1][0], value[0]), max(acc[-1][1], value[1]))]
        else:
            return acc + [value]
    
    return [functools.reduce(reducer, row, []) for row in intervals]

In [None]:
def part_1(s: str, y: int) -> int:
    acc = []
    pairs = parse(s)
    intervals, *_ = intersections(parse(s), y, y)
    assert not _

    beacons = set(
        beacon.x for _, beacon in pairs
        if beacon.y == y and any(a <= beacon.x <= b for a, b in intervals)
    )

    return sum(b - a + 1 for a, b in intervals) - len(beacons)

In [None]:
part_1(_example, y=10)

26

In [None]:
part_1(_test, y=2000000)

4717631

## Part 2

In [None]:
import asyncio
import concurrent.futures


async def part_2(s: str, min_value: int, max_value: int, num_workers: int) -> int:
    loop = asyncio.get_running_loop()
    batch_size = (max_value - min_value) // num_workers
    pairs = parse(s)
    with concurrent.futures.ProcessPoolExecutor() as pool:
        chunks = await asyncio.gather(*[
            loop.run_in_executor(
                pool,
                intersections,
                pairs,
                min_value+i*batch_size,
                min(min_value+(i+1)*batch_size - 1, max_value))
            for i in range(num_workers)
        ])
    intervals = sum(chunks, [])
    beacon_positions = [
        ((a[1] + b[0]) // 2, i)
        for i, (a, b) in [
            (i, interval)
            for i, interval in enumerate(intervals)
            if len(interval) == 2
        ]
    ]
    x, y = beacon_positions[0]
    return x * 4_000_000 + y 

In [None]:
await part_2(_example, 0, 20, num_workers=1)

56000011

In [None]:
await part_2(_test, 0, 4_000_000, num_workers=16)

13197439355220