In [1]:
DATAFILES = [
    "../_data/data1.csv",
    "../_data/data2.csv",
    "../_data/data3.csv",
    "../_data/data4.csv",
]
SIGNALS = ["BA", "MU"]
DURATIONSFILE = "../_data/durations.csv"
CODESFILE = "../_data/codes.csv"

In [2]:
import csv
import warnings
from collections import defaultdict
from datetime import timedelta, datetime
from pathlib import Path
from typing import Generic, Iterable, Iterator, TypeVar

In [3]:
class Signal:
    def __init__(self):
        self.changes: list[tuple[int, int]] = []
        self.changes.append((0, 0))

    def change(self, timestamp: int, value: int):
        last_timestamp, _last_value = self.changes[-1]
        if timestamp < last_timestamp:
            raise ValueError(
                f"Timestamp {timestamp} is earlier than last timestamp {last_timestamp}"
            )
        if timestamp == last_timestamp:
            _ = self.changes.pop()  # Replace the last change
        self.changes.append((timestamp, value))

    def dedup(self):
        # Remove duplicate changes
        deduped: list[tuple[int, int]] = []
        last_value = None
        for timestamp, value in self.changes:
            if value != last_value:
                deduped.append((timestamp, value))
                last_value = value
        self.changes = deduped

In [4]:
signals: dict[str, dict[str, Signal]] = {}
for fpath in DATAFILES:
    filepath = Path(fpath).absolute().resolve()
    name = filepath.stem
    signals[name] = defaultdict(Signal)
    with filepath.open("r", newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            timestamp = int(row["Timestamp"])
            if (row["Edge"] not in ["F", "R"]) or (
                row["Signal"] not in SIGNALS
            ):
                continue
            value = 0 if row["Edge"] == "F" else 1
            signal = row["Signal"]
            signals[name][signal].change(timestamp, value)
        for signame, signal in signals[name].items():
            signal.dedup()

In [5]:
def interlace(signals: dict[str, Signal]) -> Iterator[tuple[str, int, int]]:
    """Interlace the changes of all signals and yield them in order."""
    iterators = {name: iter(signal.changes) for name, signal in signals.items()}
    current = {name: next(it) for name, it in iterators.items()}
    while True:
        nextsignal, change = min(current.items(), key=lambda x: x[1][0])
        yield nextsignal, *change
        try:
            current[nextsignal] = next(iterators[nextsignal])
        except StopIteration:
            del current[nextsignal]
            if not current:
                break

In [6]:
# Given the first visual analysis, what is of interest is the pulses on MU
# when BA is high.
combined: dict[str, Signal] = {}
for name, signal in signals.items():
    combined[name] = Signal()
    ba, mu = 0, 0
    for signame, timestamp, value in interlace(signal):
        if signame == "BA":
            ba = 1 if value else 0
        elif signame == "MU":
            mu = 1 if value else 0
        v = (
            mu + 1
        ) * ba  # 0 => ba inactive, 1 => ba active but mu inactive, 2 => ba active and mu active
        combined[name].change(timestamp, v)
    combined[name].dedup()

In [7]:
def durationsof(signal: Signal) -> Iterator[tuple[int, int, int]]:
    """Calculate the durations of each state in the signal."""
    it = iter(signal.changes)
    last_timestamp, last_value = next(it)
    for timestamp, value in it:
        yield last_timestamp, timestamp - last_timestamp, last_value
        last_timestamp = timestamp
        last_value = value
    yield last_timestamp, 0, last_value  # Final state with zero duration

In [8]:
pDURATIONSFILE = Path(DURATIONSFILE).absolute().resolve()
durations: dict[str, list[tuple[int, int, int]]] = defaultdict(list)
with pDURATIONSFILE.open("w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["File", "Start", "Duration", "State"])
    for name, signal in combined.items():
        for start, duration, state in durationsof(signal):
            durations[name].append((start, duration, state))
            writer.writerow([name, start, duration, state])

In [9]:
T = TypeVar("T")


class lookahead(Generic[T]):
    def __init__(self, iterable: Iterable[T]):
        self.iterator = iter(iterable)
        self.buffer: list[T] = []
        self.readhead = 0

    def __iter__(self) -> "lookahead[T]":
        return self

    def __next__(self) -> T:
        if self.readhead <= len(self.buffer):
            self.buffer.append(next(self.iterator))
        self.readhead += 1
        return self.buffer[self.readhead - 1]

    def accept(self):
        self.buffer = []
        self.readhead = 0

    def reject(self):
        self.buffer = self.buffer[1:]
        self.readhead = 0

In [10]:
# From further analysis, I'm interested in the following pattern:
# BA is high for 64ms, then low for ~30ms
# During this time, MU is low for 4ms, then high for 8ms, then there is some
# encoding (I guess 8ms pulses) with a final 4ms high (generally mesured as 3ms)
def findpattern(
    durations: Iterable[tuple[int, int, int]],
) -> Iterator[tuple[int, list[tuple[int, int]]]]:
    """Find the pattern in the durations."""
    it = lookahead(durations)
    step = "out"
    pulses: list[tuple[int, int]] = []
    timestamp: int = 0
    totaltime: int = 0
    for start, duration, value in it:
        if step == "out":
            if value == 1 and (3 <= duration <= 5):
                step = "start"
                timestamp = start
                pulses = [(duration, value)]
                totaltime = duration
            else:
                it.accept()
        elif step == "start":
            if value not in [1, 2]:
                it.reject()
                step = "out"
                continue
            # else
            pulses.append((duration, value))
            totaltime += duration
            if totaltime >= 63:
                step = "stop"
        elif step == "stop":
            if value == 0:
                it.accept()
                step = "out"
                yield timestamp, pulses
            else:
                it.reject()
                step = "out"
                continue
        else:
            raise ValueError(f"Unknown step {step}")

In [11]:
patterns: dict[str, list[tuple[int,list[tuple[int,int]]]]] = {}
for name, durations_ in durations.items():
    patterns[name] = list(findpattern(durations_))

In [12]:
# My hypothesis is that the pattern is a 4ms low, 8ms high, then 8ms pulses high
# or low depending on the value encoded, and a final 3ms (4ms but often shorten
# by the sampling) high.
def splitpattern(pulses: list[tuple[int, int]]) -> list[tuple[int, int]] | None:
    """Split the pattern into its components."""
    splitpulses: list[tuple[int, int]] = []
    it = iter(pulses)
    _first = next(it)
    splitpulses.append(
        (4, 0)
    )  # 4ms low, ensured by findpattern, no need to validate
    for _ in range(len(pulses) - 2):
        duration, value = next(it)
        n = round(duration / 8)
        for _ in range(n):
            splitpulses.append((8, value - 1))
    duration, value = next(it)  # Last pulse
    if duration <= 4:
        splitpulses.append((4, value - 1))
    else:
        d = duration - 3
        n = round(d / 8)
        for _ in range(n):
            splitpulses.append((8, value - 1))
        splitpulses.append((4, value - 1))  # Final 4ms high
    totalduration = sum(d for d, _ in splitpulses)
    if totalduration != 64:
        warnings.warn(f"Total duration {totalduration} is not 64ms")
        return None
    return splitpulses

In [13]:
decodeds: list[tuple[str,str,None|list[tuple[int,int]]]] = []
for name, pulses in patterns.items():
    for timestamp, pulses_ in pulses:
        splitpulses = splitpattern(pulses_)
        dt = datetime(2000, 1, 1, 0, 0, 0) + timedelta(milliseconds=timestamp)
        ts = dt.time().strftime("%H:%M:%S.%f")[:-3]
        decodeds.append((name, ts, splitpulses))

In [14]:
def pulses2bits(pulses: list[tuple[int, int]]) -> str:
    """Convert the pulses to bits."""
    bits = "0b"
    for _, value in pulses[1:-1]:  # Skip the first and last pulse
        bits += str(value)
    return bits

In [15]:
codes: list[tuple[str,str,str]] = []
for name, ts, pulses in decodeds:
    bits = pulses2bits(pulses) if pulses else "Invalid"
    codes.append((name, ts[:5], bits))

In [16]:
pCODESFILE = Path(CODESFILE).absolute().resolve()
with pCODESFILE.open("w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["File", "Time", "Code"])
    for name, ts, code in codes:
        writer.writerow([name, ts, code])

Final decoding gives the following code:

```
0b1TTUUUU
```

with `TT` encoding the tens digit (0, 1, 2) and `UUUU` encoding the units digit.
Thus, for example:

```
01:00 => 0b1000001
03:00 => 0b1000011
10:00 => 0b1010000
13:00 => 0b1010011
23:00 => 0b1100011
```

I have no trace of `00:00` however.

Full protocol is as follows:

`BA` goes high. 4ms after, `MU` pulses its encoding, always starting with a
8ms high pulse (the first bit), then the 6 bits encoding time (`TT` and `UUUU`)
each lasting 8ms, and finally a 4ms high pulse to end the frame, both `MU` and
`BA` falling at the same time. The entire frame lasts 64ms.