In [2]:
from pyftdi.i2c import I2cController
import struct, time, sys
from typing import List, Dict, Optional

class MPU6050:
    # --- I2C address ---
    DEFAULT_ADDR = 0x68

    # --- Registers ---
    REG_SMPLRT_DIV   = 0x19
    REG_CONFIG       = 0x1A
    REG_GYRO_CONFIG  = 0x1B
    REG_ACCEL_CONFIG = 0x1C
    REG_FIFO_EN      = 0x23
    REG_INT_ENABLE   = 0x38
    REG_INT_STATUS   = 0x3A
    REG_TEMP_OUT_H   = 0x41
    REG_TEMP_OUT_L   = 0x42
    REG_USER_CTRL    = 0x6A
    REG_PWR_MGMT_1   = 0x6B
    REG_PWR_MGMT_2   = 0x6C
    REG_FIFO_COUNTH  = 0x72
    REG_FIFO_COUNTL  = 0x73
    REG_FIFO_R_W     = 0x74
    REG_WHO_AM_I     = 0x75

    # --- Scales (¬±2 g, ¬±250 ¬∞/s) ---
    ACCEL_SCALE = 16384.0
    GYRO_SCALE  = 131.0

    def __init__(
        self,
        url: str = "ftdi://ftdi:232h/1",
        bus_rate: int = 400_000,
        addr: int = DEFAULT_ADDR,
        sample_div: int = 4,      # 1kHz/(1+div) => 200 Hz
        dlpf_cfg: int = 0x03,     # 42 Hz
        accel_cfg: int = 0x00,    # ¬±2 g
        gyro_cfg: int = 0x00,     # ¬±250 dps
        verbose: bool = True,
        check_id: bool = True,
    ):
        self.verbose = verbose
        self.addr = addr

        # --- I2C setup ---
        self.i2c = I2cController()
        self.i2c.configure(f"{url}?frequency={bus_rate}")
        self.mpu = self.i2c.get_port(self.addr)

        if self.verbose:
            print(f"üß© Connected via {url} @ {bus_rate} Hz, addr=0x{self.addr:02X}")

        # Optional identity check
        if check_id:
            who = self._read8(self.REG_WHO_AM_I)
            print(f"üÜî WHO_AM_I = 0x{who:02X}")
            if who != 0x68 and self.verbose:
                print("‚ö†Ô∏è Unexpected WHO_AM_I (check AD0/wiring)")

        # Init
        self._soft_init(sample_div, dlpf_cfg, accel_cfg, gyro_cfg)
        if self.verbose:
            self.dump_regs()

    # --------- Low-level I¬≤C helpers ---------
    def _write8(self, reg: int, val: int) -> None:
        self.mpu.write_to(reg, bytes([val & 0xFF]))
        time.sleep(0.002)

    def _read8(self, reg: int) -> int:
        return self.mpu.exchange([reg], 1)[0]

    def _read16_be(self, reg: int) -> int:
        return struct.unpack('>H', self.mpu.exchange([reg], 2))[0]

    def _fifo_read(self, nbytes: int) -> bytes:
        return self.mpu.exchange([self.REG_FIFO_R_W], nbytes)

    def _fifo_reset(self) -> None:
        self._write8(self.REG_USER_CTRL, self._read8(self.REG_USER_CTRL) | 0x04)
        time.sleep(0.01)
        self._write8(self.REG_USER_CTRL, self._read8(self.REG_USER_CTRL) & ~0x04)
        time.sleep(0.01)

    # --------- Temperature ---------
    def read_temperature(self) -> float:
        """Read temperature once (¬∞C) using datasheet formula."""
        h = self._read8(self.REG_TEMP_OUT_H)
        l = self._read8(self.REG_TEMP_OUT_L)
        raw = (h << 8) | l
        if raw >= 0x8000:
            raw -= 0x10000
        return raw / 340.0 + 36.53

    # --------- Init / control ---------
    def _soft_init(self, sample_div: int, dlpf_cfg: int, accel_cfg: int, gyro_cfg: int) -> None:
        # No hard reset (clones may NACK after 0x80)
        self._write8(self.REG_PWR_MGMT_1, 0x01)  # PLL X-gyro
        self._write8(self.REG_PWR_MGMT_2, 0x00)  # all sensors on
        time.sleep(0.05)

        # Basic config
        self._write8(self.REG_CONFIG, dlpf_cfg & 0x07)
        self._write8(self.REG_SMPLRT_DIV, sample_div & 0xFF)
        self._write8(self.REG_GYRO_CONFIG, gyro_cfg & 0x18)   # FS bits [4:3]
        self._write8(self.REG_ACCEL_CONFIG, accel_cfg & 0x18) # FS bits [4:3]

        # FIFO off, reset
        self._write8(self.REG_USER_CTRL, 0x00)
        self._write8(self.REG_FIFO_EN, 0x00)
        self._fifo_reset()
        time.sleep(0.05)

        # Enable FIFO logic + select accel+gyro
        self._write8(self.REG_USER_CTRL, 0x40)   # FIFO_EN (logic)
        self._write8(self.REG_FIFO_EN, 0x78)     # XG|YG|ZG|ACCEL
        self._write8(self.REG_INT_ENABLE, 0x01)  # data-ready (optional)
        _ = self._read8(self.REG_INT_STATUS)     # clear stale interrupt

        # Quick FIFO growth sanity
        if self.verbose:
            for t in (0.05, 0.10, 0.20):
                time.sleep(t)
                cnt = self._read16_be(self.REG_FIFO_COUNTH)
                print(f"FIFO count @ +{int(t*1000)} ms: {cnt}")

    def dump_regs(self) -> None:
        regs = [
            ("WHO_AM_I",     self.REG_WHO_AM_I),
            ("PWR_MGMT_1",   self.REG_PWR_MGMT_1),
            ("PWR_MGMT_2",   self.REG_PWR_MGMT_2),
            ("CONFIG",       self.REG_CONFIG),
            ("SMPLRT_DIV",   self.REG_SMPLRT_DIV),
            ("GYRO_CONFIG",  self.REG_GYRO_CONFIG),
            ("ACCEL_CONFIG", self.REG_ACCEL_CONFIG),
            ("USER_CTRL",    self.REG_USER_CTRL),
            ("FIFO_EN",      self.REG_FIFO_EN),
            ("INT_ENABLE",   self.REG_INT_ENABLE),
            ("INT_STATUS",   self.REG_INT_STATUS),
        ]
        print("\nüîé Register dump:")
        for name, r in regs:
            print(f"   {name:<14}= 0x{self._read8(r):02X}")
        print()

    # --------- Capture API ---------
    def capture(
        self,
        duration_s: Optional[float] = 5.0,
        max_samples: Optional[int] = None,
        print_rate_hz: float = 10.0,
        return_raw: bool = False,
        include_temp: bool = True,
    ) -> List[Dict[str, float]]:
        """
        Capture accel/gyro samples via FIFO, with optional temperature per batch.

        duration_s: stop after this many seconds (None to ignore)
        max_samples: stop after this many samples (None to ignore)
        print_rate_hz: terminal update rate (0 disables prints)
        return_raw: return raw ints instead of scaled floats
        include_temp: read temperature once per FIFO drain and add to each record

        Returns list of dicts: {t, ax, ay, az, gx, gy, gz, temp?}
        """
        if duration_s is None and max_samples is None:
            duration_s = 5.0

        data_log: List[Dict[str, float]] = []
        if self.verbose:
            print("üì° Capturing FIFO data... (Ctrl+C to stop)\n")

        t0 = time.time()
        last_print = 0.0
        bytes_per_sample = 12  # 6 accel + 6 gyro

        try:
            while True:
                if duration_s is not None and (time.time() - t0) >= duration_s:
                    break
                if max_samples is not None and len(data_log) >= max_samples:
                    break

                fifo_count = self._read16_be(self.REG_FIFO_COUNTH)

                if fifo_count > 1020:
                    self._fifo_reset()
                    continue

                if fifo_count < bytes_per_sample:
                    time.sleep(0.003)
                    continue

                # Read temperature once per batch if requested
                temp_c = self.read_temperature() if include_temp else None

                to_read = (fifo_count // bytes_per_sample) * bytes_per_sample
                blob = self._fifo_read(to_read)

                for i in range(0, len(blob), bytes_per_sample):
                    ax, ay, az, gx, gy, gz = struct.unpack('>hhhhhh', blob[i:i+bytes_per_sample])

                    rec_t = time.time()
                    if return_raw:
                        rec = {'t': rec_t, 'ax': ax, 'ay': ay, 'az': az, 'gx': gx, 'gy': gy, 'gz': gz}
                    else:
                        rec = {
                            't': rec_t,
                            'ax': ax/self.ACCEL_SCALE, 'ay': ay/self.ACCEL_SCALE, 'az': az/self.ACCEL_SCALE,
                            'gx': gx/self.GYRO_SCALE,  'gy': gy/self.GYRO_SCALE,  'gz': gz/self.GYRO_SCALE
                        }
                    if include_temp:
                        rec['temp'] = temp_c
                    data_log.append(rec)

                # Throttled console status
                if print_rate_hz and print_rate_hz > 0:
                    now = time.time()
                    if (now - last_print) >= (1.0 / print_rate_hz):
                        last_print = now
                        rec = data_log[-1]
                        if return_raw:
                            sys.stdout.write(
                                f"\rA=({rec['ax']:+6d},{rec['ay']:+6d},{rec['az']:+6d})  "
                                f"G=({rec['gx']:+6d},{rec['gy']:+6d},{rec['gz']:+6d})  "
                                + (f"T={rec['temp']:.2f}¬∞C  " if include_temp else "")
                                + f"n_samples={len(data_log)}     "
                            )
                        else:
                            sys.stdout.write(
                                f"\rA=({rec['ax']:+.2f},{rec['ay']:+.2f},{rec['az']:+.2f}) g  "
                                f"G=({rec['gx']:+6.1f},{rec['gy']:+6.1f},{rec['gz']:+6.1f}) ¬∞/s  "
                                + (f"T={rec['temp']:.2f}¬∞C  " if include_temp else "")
                                + f"n_samples={len(data_log)}     "
                            )
                        sys.stdout.flush()

        except KeyboardInterrupt:
            print("\nüõë Stopped by user.")
        finally:
            if self.verbose:
                print(f"\nüì¶ Recorded {len(data_log)} samples.")
            return data_log

    # --------- Cleanup / context manager ---------
    def close(self) -> None:
        try:
            self._write8(self.REG_FIFO_EN, 0x00)
            self._write8(self.REG_USER_CTRL, self._read8(self.REG_USER_CTRL) & ~0x40)
        except Exception:
            pass
        self.i2c.terminate()
        if self.verbose:
            print("üîö FIFO disabled, I¬≤C connection closed.")

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc, tb):
        self.close()


# --------- Example usage ---------
if __name__ == "__main__":
    try:
        with MPU6050(
            url="ftdi://ftdi:232h/1",
            bus_rate=400_000,
            sample_div=4,     # 200 Hz
            dlpf_cfg=3,       # 42 Hz
            accel_cfg=0x00,   # ¬±2 g
            gyro_cfg=0x00,    # ¬±250 dps
            verbose=True
        ) as imu:
            # e.g., capture for 3 seconds, show live prints, include temperature
            samples = imu.capture(duration_s=300.0, max_samples=None, print_rate_hz=20.0, include_temp=True)

            # samples is a list of dicts:
            # [{'t': ..., 'ax': ..., 'ay': ..., 'az': ..., 'gx': ..., 'gy': ..., 'gz': ..., 'temp': ...}, ...]
    except Exception as e:
        print(f"‚ùå Error: {e}")


üß© Connected via ftdi://ftdi:232h/1 @ 400000 Hz, addr=0x68
üÜî WHO_AM_I = 0x68
FIFO count @ +50 ms: 288
FIFO count @ +100 ms: 612
FIFO count @ +200 ms: 1024

üîé Register dump:
   WHO_AM_I      = 0x68
   PWR_MGMT_1    = 0x01
   PWR_MGMT_2    = 0x00
   CONFIG        = 0x03
   SMPLRT_DIV    = 0x04
   GYRO_CONFIG   = 0x00
   ACCEL_CONFIG  = 0x00
   USER_CTRL     = 0x40
   FIFO_EN       = 0x78
   INT_ENABLE    = 0x01
   INT_STATUS    = 0x11

üì° Capturing FIFO data... (Ctrl+C to stop)

A=(+0.03,+0.00,+0.77) g  G=(  -1.5,  +0.5,  +1.5) ¬∞/s  T=25.85¬∞C  n_samples=21243     
üõë Stopped by user.

üì¶ Recorded 21243 samples.
üîö FIFO disabled, I¬≤C connection closed.
