<a href="https://colab.research.google.com/github/MatthewHRockwell/ATOMIK-Architecture-Benchmarks/blob/main/ATOMiK.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ATOMiK Latency Test

This script benchmarks the core latency of the **ATOMiK architecture**, specifically separating system overhead (Python/OS) from the core logic (Hardware Target).

## How to Run

### 1. With Real Data
1.  Place a video file named `your_video.mp4` in the same directory as the script.
2.  Run the code cell below.
3.  The script will automatically detect the file and use it.

### 2. With Synthetic Data
1.  Ensure `your_video.mp4` is **not** present (or rename it).
2.  Run the code cell below.
3.  The script will automatically fall back to generating a synthetic moving diagonal line pattern to simulate delta compression.

## Output Breakdown
The script outputs a breakdown separating:
* **System Overhead:** I/O, Python loops (eliminated in hardware).
* **ATOMiK Core Logic:** The actual XOR math (the hardware performance target).

In [3]:
# ==============================================================================
#  ATOMiK: Externally Stateless Delta-Driven Architecture
#  Reference Implementation (Python)
#
#  Copyright (c) 2026 Matthew H. Rockwell. All Rights Reserved.
#  PATENT PENDING
#
#  NOTICE: This software is provided for evaluation and academic purposes only.
#  Commercial use, hardware synthesis, or derivation of FPGA/ASIC cores
#  based on this architecture requires a license.
# ==============================================================================

import cv2
import numpy as np
import time
import os
from collections import Counter

H, W = 100, 100
GRID, TILE, TWIN = 25, 4, 4
VOXELS_PER_WIN = GRID * GRID

# -----------------------------
# 1) Load video -> gray frames
# -----------------------------
def load_video_gray(video_path, max_frames=120, target_hw=(H, W)):
    # --- SYNTHETIC DATA GENERATOR ---
    # Allows the script to run without an external video file for demonstration
    if video_path == "synthetic":
        print("    [Gen] Generating synthetic test pattern (Diagonal Motion)...")
        frames = []
        for t in range(max_frames):
            frame = np.zeros(target_hw, dtype=np.uint8)
            # Draw a moving diagonal line to create deltas
            start_point = (t % target_hw[1], 0)
            end_point = (0, t % target_hw[0])
            cv2.line(frame, start_point, end_point, 255, thickness=5)
            # Add some random noise to ensure non-zero deltas
            noise = np.random.randint(0, 50, target_hw, dtype=np.uint8)
            frame = cv2.add(frame, noise)
            frames.append(frame)
        return np.stack(frames, axis=0), 30.0

    # --- REAL VIDEO LOADER ---
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise FileNotFoundError(f"Could not open video: {video_path}")
    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    frames = []
    while len(frames) < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        small = cv2.resize(gray, target_hw, interpolation=cv2.INTER_AREA)
        frames.append(small.astype(np.uint8))
    cap.release()
    if len(frames) < TWIN:
        raise ValueError(f"Need at least {TWIN} frames.")
    return np.stack(frames, axis=0), float(fps)

# -----------------------------------------
# 2) Tile-energy Otsu binarization in 25x25
# -----------------------------------------
def tile_energy_bin25(gray_frames, method="otsu"):
    """
    Returns tile25: [T,25,25] uint8 {0,1}
    """
    T, H_, W_ = gray_frames.shape
    assert (H_, W_) == (H, W)

    tile25 = np.zeros((T, GRID, GRID), dtype=np.uint8)
    for t in range(T):
        g = gray_frames[t].astype(np.float32)
        tiles = g.reshape(GRID, TILE, GRID, TILE).mean(axis=(1,3)).astype(np.uint8)  # 25x25
        if method == "otsu":
            _, tb = cv2.threshold(tiles, 0, 1, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        else:
            tb = (tiles >= 127).astype(np.uint8)
        tile25[t] = tb.astype(np.uint8)
    return tile25

# -----------------------------------------
# 3) Build window patterns P[k] as 4-bit IDs
# -----------------------------------------
def build_window_patterns(tile25):
    """
    tile25: [T,25,25] {0,1}
    Returns:
      P: [num_windows,25,25] uint8 in [0..15], where
         P[k] = (t[k]<<3)|(t[k+1]<<2)|(t[k+2]<<1)|(t[k+3]<<0)
    """
    T = tile25.shape[0]
    num_windows = T - TWIN + 1
    b0 = tile25[0:num_windows]
    b1 = tile25[1:num_windows+1]
    b2 = tile25[2:num_windows+2]
    b3 = tile25[3:num_windows+3]
    P = ((b0.astype(np.uint8) << 3) |
         (b1.astype(np.uint8) << 2) |
         (b2.astype(np.uint8) << 1) |
         (b3.astype(np.uint8) << 0))
    return P  # [num_windows,25,25]

# -----------------------------------------
# 4) LUT: 4-bit pattern -> 64-bit word
# -----------------------------------------
def build_lut_4bit_to_u64():
    lut = np.zeros((16,), dtype=">u8")
    for p in range(16):
        b0 = (p >> 3) & 1
        b1 = (p >> 2) & 1
        b2 = (p >> 1) & 1
        b3 = (p >> 0) & 1
        s0 = 0xFFFF if b0 else 0x0000
        s1 = 0xFFFF if b1 else 0x0000
        s2 = 0xFFFF if b2 else 0x0000
        s3 = 0xFFFF if b3 else 0x0000
        word = (s0 << 48) | (s1 << 32) | (s2 << 16) | (s3 << 0)
        lut[p] = np.array([word], dtype=">u8")[0]
    return lut

LUT_4BIT_U64 = build_lut_4bit_to_u64()

def patterns_to_words(Pk_25x25):
    """Pk_25x25: [25,25] uint8 0..15 -> [625] >u8"""
    return LUT_4BIT_U64[Pk_25x25].reshape(VOXELS_PER_WIN)

# -----------------------------------------
# 5) TXv5 encode: row-mask + packed nibbles
# -----------------------------------------
def pack_nibbles(nibbles):
    """nibbles: list[int 0..15] -> bytes"""
    out = bytearray()
    for i in range(0, len(nibbles), 2):
        a = nibbles[i] & 0xF
        b = (nibbles[i+1] & 0xF) if i+1 < len(nibbles) else 0
        out.append((a << 4) | b)
    return bytes(out)

def unpack_nibbles(buf, n):
    """buf: bytes, n: number of nibbles -> list[int]"""
    out = []
    need_bytes = (n + 1) // 2
    chunk = buf[:need_bytes]
    for b in chunk:
        out.append((b >> 4) & 0xF)
        if len(out) < n:
            out.append(b & 0xF)
    return out, need_bytes

def encode_txv5(D4):
    """
    D4: [num_deltas,25,25] uint8 0..15
    Returns stream bytes.
    """
    num_deltas = D4.shape[0]
    stream = bytearray()

    k = 0
    while k < num_deltas:
        d = D4[k]
        if np.all(d == 0):
            # build skip run
            run = 1
            while (k + run) < num_deltas and np.all(D4[k + run] == 0) and run < 65535:
                run += 1
            stream += b'S'
            stream += int(k).to_bytes(2, 'big')
            stream += int(run).to_bytes(2, 'big')
            k += run
            continue

        # delta record
        stream += b'D'
        stream += int(k).to_bytes(2, 'big')

        # rows
        for y in range(GRID):
            row = d[y]
            nz_x = np.nonzero(row)[0].tolist()
            mask = 0
            for x in nz_x:
                mask |= (1 << x)
            stream += int(mask).to_bytes(4, 'big')  # uses only 25 bits

            if nz_x:
                # Correct packing logic with proper scoping
                nibbles = [int(row[x]) & 0xF for x in nz_x]
                stream += pack_nibbles(nibbles)

        k += 1

    return bytes(stream)

# -----------------------------------------
# 6) TXv5 decode to reconstruct all windows
# -----------------------------------------
def decode_txv5_to_patterns(P0, stream_bytes, num_windows):
    decoded = {0: P0.copy()}
    cur_step = 0
    Pcur = P0.copy()
    i = 0
    while i < len(stream_bytes):
        tag = stream_bytes[i:i+1]; i += 1
        if tag == b'S':
            step = int.from_bytes(stream_bytes[i:i+2], 'big'); i += 2
            run  = int.from_bytes(stream_bytes[i:i+2], 'big'); i += 2
            while cur_step < step:
                decoded[cur_step + 1] = Pcur.copy(); cur_step += 1
            for _ in range(run):
                if (cur_step + 1) < num_windows: decoded[cur_step + 1] = Pcur.copy()
                cur_step += 1
        elif tag == b'D':
            step = int.from_bytes(stream_bytes[i:i+2], 'big'); i += 2
            while cur_step < step:
                decoded[cur_step + 1] = Pcur.copy(); cur_step += 1
            d = np.zeros((GRID, GRID), dtype=np.uint8)
            for y in range(GRID):
                mask = int.from_bytes(stream_bytes[i:i+4], 'big'); i += 4
                xs = [x for x in range(GRID) if (mask >> x) & 1]
                if xs:
                    nibbles, used = unpack_nibbles(stream_bytes[i:], len(xs))
                    i += used
                    for x, val in zip(xs, nibbles): d[y, x] = val & 0xF
            Pcur = (Pcur ^ d).astype(np.uint8)
            if (step + 1) < num_windows: decoded[step + 1] = Pcur.copy()
            cur_step = step + 1
        else: raise ValueError(f"Bad record tag: {tag}")
    while cur_step < (num_windows - 1):
        decoded[cur_step + 1] = Pcur.copy(); cur_step += 1
    return decoded

# -----------------------------------------
# 7) End-to-end runner + Latency Separation
# -----------------------------------------
def run_txv5(video_path="your_video.mp4", max_frames=120, method="otsu"):
    # --- PHASE 1: I/O Overhead (Eliminated in HW) ---
    t0 = time.perf_counter()
    gray_frames, fps = load_video_gray(video_path, max_frames=max_frames)
    t1 = time.perf_counter()
    time_io = (t1 - t0) * 1000

    # --- PHASE 2: Pre-Processing (Pipeline Step) ---
    tile25 = tile_energy_bin25(gray_frames, method=method)
    t2 = time.perf_counter()
    time_preprocess = (t2 - t1) * 1000

    # --- PHASE 3: ATOMiK Core Logic (Hardware Target) ---
    # Step A: Build Patterns (Shift Registers in HW)
    P = build_window_patterns(tile25)
    t3 = time.perf_counter()

    # Step B: Delta Computation (XOR Gates in HW)
    # This is the "Atomic" time we care about
    num_windows = P.shape[0]
    num_deltas = num_windows - 1
    D4 = (P[1:] ^ P[:-1]).astype(np.uint8)
    t4 = time.perf_counter()

    time_atomik_logic = (t4 - t2) * 1000

    # --- PHASE 4: Serialization Overhead (Python Loops) ---
    # In HW, this is a pipelined FIFO. In Python, it is slow sequential loops.
    stream = encode_txv5(D4)
    t5 = time.perf_counter()
    time_serialization = (t5 - t4) * 1000

    # --- PHASE 5: Verification (Not part of runtime) ---
    decoded = decode_txv5_to_patterns(P[0], stream, num_windows)
    t6 = time.perf_counter()

    # --- Metrics Calculation ---
    mism = 0
    for k in range(num_windows):
        if not np.array_equal(decoded[k], P[k]): mism += 1
    replay_ok = (mism == 0 and len(decoded) == num_windows)

    duration_sec = (gray_frames.shape[0] - 1) / fps
    bytes_per_sec = len(stream) / max(duration_sec, 1e-9)
    changed_steps = int(np.sum(np.any(D4 != 0, axis=(1,2))))

    # --- PRINT REPORT ---
    print("\n=======================================================")
    print("           ATOMiK LATENCY BREAKDOWN (TXv5)            ")
    print("=======================================================")
    if video_path == "synthetic":
        print(f"Input: Synthetic Test Pattern ({max_frames} Frames)")
    else:
        print(f"Input: {gray_frames.shape[0]} Frames @ {fps:.2f} FPS")
    print(f"Total Wall Clock Time: {(t6-t0)*1000:.2f} ms")

    print("\n--- [A] SYSTEM OVERHEAD (Eliminated in FPGA/ASIC) ---")
    print(f"1. Video I/O (Disk Read):        {time_io:.2f} ms")
    print(f"2. Pre-processing (ISP):         {time_preprocess:.2f} ms")
    print(f"3. Python Serialization Loops:   {time_serialization:.2f} ms")
    print(f"   >> TOTAL OVERHEAD:            {time_io + time_preprocess + time_serialization:.2f} ms")

    print("\n--- [B] ATOMiK CORE LOGIC (The Hardware Target) ---")
    print(f"1. Pattern Encoding + Delta XOR: {time_atomik_logic:.2f} ms")
    print(f"   >> PER FRAME LOGIC TIME:      {time_atomik_logic / max_frames:.4f} ms")
    print(f"   >> PROJECTED HW FREQUENCY:    >100 MHz (Single Cycle)")

    print("\n--- [C] PERFORMANCE METRICS ---")
    print(f"Bandwidth:          {bytes_per_sec/1024.0:.2f} KB/s")
    print(f"Compression Ratio:  {100 - (len(stream)/(gray_frames.size/8)*100):.2f}% vs Raw 1-bit")
    print(f"Replay Verification: {'PASS' if replay_ok else 'FAIL'}")
    print("=======================================================\n")

    return {
        "time_atomik": time_atomik_logic,
        "time_overhead": time_io + time_serialization,
        "stream_size": len(stream)
    }

# -------------
# RUN
# -------------
if __name__ == "__main__":
    # AUTO-DETECTION LOGIC
    target_video = "your_video.mp4"

    print(f"[*] Initializing ATOMiK Benchmark...")
    if os.path.exists(target_video):
        print(f"[*] Detected local video file: '{target_video}'")
        print("    Running benchmark on REAL DATA.")
        path_arg = target_video
    else:
        print(f"[-] Video file '{target_video}' not found.")
        print("    Running benchmark on SYNTHETIC DATA (Moving Line Pattern).")
        path_arg = "synthetic"

    res = run_txv5(video_path=path_arg, max_frames=120)

[*] Initializing ATOMiK Benchmark...
[*] Detected local video file: 'your_video.mp4'
    Running benchmark on REAL DATA.

           ATOMiK LATENCY BREAKDOWN (TXv5)            
Input: 120 Frames @ 30.00 FPS
Total Wall Clock Time: 4476.21 ms

--- [A] SYSTEM OVERHEAD (Eliminated in FPGA/ASIC) ---
1. Video I/O (Disk Read):        4438.75 ms
2. Pre-processing (ISP):         23.87 ms
3. Python Serialization Loops:   7.12 ms
   >> TOTAL OVERHEAD:            4469.73 ms

--- [B] ATOMiK CORE LOGIC (The Hardware Target) ---
1. Pattern Encoding + Delta XOR: 0.46 ms
   >> PER FRAME LOGIC TIME:      0.0038 ms
   >> PROJECTED HW FREQUENCY:    >100 MHz (Single Cycle)

--- [C] PERFORMANCE METRICS ---
Bandwidth:          3.04 KB/s
Compression Ratio:  91.76% vs Raw 1-bit
Replay Verification: PASS

