# Day 16: BITS
The input for this problem is located at https://adventofcode.com/2021/day/16/input

In [1]:
import binascii
import enum
import io
import operator
import typing

%load_ext numpy_html

## Part 1

We *could* solve this with ASCII text, but I'd prefer to implement a *proper* solution using a bit-stream. 

### Reading bits from a bytes stream

In [2]:
def bit_mask(n: int) -> int:
    return (1 << n) - 1

In [3]:
class BitStream:
    def __init__(self, stream: io.IOBase):
        self._stream = stream
        self._offset = 0

    @classmethod
    def from_bytes(cls, bytes_):
        return cls(io.BytesIO(bytes_))

    @property
    def offset(self) -> int:
        return self._offset

    @property
    def location(self) -> tuple[int, int]:
        return divmod(self._offset, 8)

    def skip_bits(self, n: int):
        self._offset += n

    def seek_next_byte(self):
        self._offset += 8 - (self._offset % 8)

    def read_integer(self, n_bits: int) -> int:
        byte_start, bit_start = divmod(self._offset, 8)
        self._offset += n_bits
        byte_end, bit_end = divmod(self._offset, 8)
        self._stream.seek(byte_start)
        content = int.from_bytes(self._stream.read((byte_end + 1) - byte_start), "big")
        return (content >> (8 - bit_end)) & bit_mask(n_bits)

### Generating events
We could create a parse tree here. However, to support partial reads, I'd prefer to implement an event stream, where the reading can stop before all packets are consumed from the byte stream. This event stream is unsurprisingly comprised of "events", with packet boundaries to permit reconstruction of the parse tree.

In [4]:
class StreamEvent(enum.Enum):
    BEGIN_PACKET = enum.auto()
    READ_UNTIL = enum.auto()
    READ_SEVERAL = enum.auto()
    EMIT_VALUE = enum.auto()
    END_PACKET = enum.auto()

As indicated, to read a literal we build an integer from repeated shifts of the non-contiguous chunks. This process finishes with the generation of a `EMIT_VALUE` event

In [5]:
def read_literal(stream):
    chunks = []
    while True:
        is_last = not stream.read_integer(1)
        chunks.append(stream.read_integer(4))
        if is_last:
            break

    value = 0
    for i, chunk in enumerate(chunks):
        shift = (len(chunks) - (i + 1)) * 4
        value |= chunk << shift
    yield StreamEvent.EMIT_VALUE, value

To read an operator, we switch on the type bit, before generating a type-specific event

In [6]:
def read_operator(stream):
    mode = stream.read_integer(1)
    if mode:
        n_packets = stream.read_integer(11)
        yield StreamEvent.READ_SEVERAL, n_packets
        for i in range(n_packets):
            yield from read_packet(stream)
    else:
        length = stream.read_integer(15)
        yield StreamEvent.READ_UNTIL, length

        start = stream.offset
        while stream.offset < length + start:
            yield from read_packet(stream)
        assert stream.offset == length + start

The header is common to all packets, so we create a nice type for it here.

In [7]:
class PacketHeader(typing.NamedTuple):
    version: int
    type: int


def read_packet_header(stream):
    version = stream.read_integer(3)
    type_ = stream.read_integer(3)
    return PacketHeader(version, type_)

In [8]:
def read_packet(stream):
    header = read_packet_header(stream)
    yield (StreamEvent.BEGIN_PACKET, header, stream.location)

    # Read contents
    if header.type == 4:
        yield from read_literal(stream)
    else:
        yield from read_operator(stream)

    # Indicate end of packet
    yield (StreamEvent.END_PACKET, header, stream.location)

Load the problem.

In [9]:
with open("input.txt") as f:
    data = b"".join([binascii.unhexlify(l) for l in f])

In [10]:
events = read_packet(BitStream.from_bytes(data))
headers = (ev[1] for ev in events if ev[0] == StreamEvent.BEGIN_PACKET)
sum(h.version for h in headers)

908

## Part 2

### Packet Types
We have a concrete set of operator packet types. Let's enumerate them here:

In [11]:
class PacketType(enum.IntEnum):
    SUM = 0
    PRODUCT = 1
    MINIMUM = 2
    MAXIMUM = 3
    VALUE = 4
    GREATER_THAN = 5
    LESS_THAN = 6
    EQUAL_TO = 7

Each operator type has an associated reducer type:

In [12]:
TYPE_TO_REDUCER = {
    PacketType.SUM: operator.add,
    PacketType.PRODUCT: operator.mul,
    PacketType.MINIMUM: min,
    PacketType.MAXIMUM: max,
    PacketType.GREATER_THAN: operator.gt,
    PacketType.LESS_THAN: operator.lt,
    PacketType.EQUAL_TO: operator.eq,
}

To process the event stream, we can use a stack machine. For simple value packets, we can just push their contents to the stack. For operator packets, we need a bit more logic. Operator packets should reduce their arguments (the stack) at the end of the packet (`END_PACKET`). In order to do this without maintaining a history of the processed events, we can push a sentinel value to the stack upon entering an operator packet, such that reduction is terminated upon encountering this object:

In [13]:
class _Sentinel(typing.NamedTuple):
    kind: str


REDUCE_SENTINEL = _Sentinel("REDUCE_STOP")

The stack logic is as follows:

In [14]:
def evaluate_transmission(events):
    stack = []

    for event, *payload in events:
        if event == StreamEvent.EMIT_VALUE:
            stack.append(payload[0])
        elif event == StreamEvent.BEGIN_PACKET and payload[0].type != PacketType.VALUE:
            stack.append(REDUCE_SENTINEL)
        elif event == StreamEvent.END_PACKET:
            header = payload[0]
            reducer = TYPE_TO_REDUCER.get(header.type)
            if reducer is None:
                continue

            while True:
                b = stack.pop()
                a = stack.pop()
                if a is REDUCE_SENTINEL:
                    break
                stack.append(reducer(a, b))
            stack.append(b)
    return stack.pop()

In [15]:
events = read_packet(BitStream.from_bytes(data))

evaluate_transmission(events)

10626195124371