In [23]:
import time
import threading
import queue
import numpy as np

In [None]:
# ----------------------------------------------------  parameters
SAMPLE_RATE   = 100_000          # Hz
CHUNK_SEC     = 0.2              # s
N_SAMPLES     = int(SAMPLE_RATE * CHUNK_SEC)
QUEUE_DEPTH   = 4 # 許容遅延 0.2 s * 4 = 0.8 s

PULSE_HEIGHT  = 5.0              # amplitude
PULSE_WIDTH   = 0.002            # period  (s)
PULSE_DUTY    = 0.5              # duty
PULSE_PHASE_A   = 0.0              # phase offset (s)
PULSE_PHASE_B   = 0.0005              # phase offset (s)

QUADPULSE_WIDTH = 0.00025            # width (s)

buf_q   = queue.Queue(maxsize=QUEUE_DEPTH)
stop_ev = threading.Event()


In [25]:
THRESHOLD_DEFAULT = 2.5  # V

def gen_pulse_binarize(signal: np.ndarray, threshold: float = THRESHOLD_DEFAULT) -> np.ndarray:
    """Return a boolean array where *True* represents a logic‑high level.

    Parameters
    ----------
    signal : np.ndarray
        Input analogue signal (voltage).
    threshold : float, optional
        Voltage threshold separating logic high and low. Defaults to 2.5 V.

    Returns
    -------
    np.ndarray
        Boolean array of the same length as *signal*.
    """
    return signal > threshold


def gen_pulse_shift(values: np.ndarray) -> np.ndarray:
    """Return array shifted right by one sample (previous value)."""
    return np.concatenate([[values[0]], values[:-1]])


def gen_pulse_direction(
    chan_a: np.ndarray,
    chan_b: np.ndarray,
    threshold: float = THRESHOLD_DEFAULT,
) -> np.ndarray:
    """Compute signed edges (+1 = CW, −1 = CCW, 0 = no edge).

    Parameters
    ----------
    chan_a, chan_b : np.ndarray
        Analogue quadrature signals A and B.
    threshold : float, optional
        Logic threshold for binarisation.

    Returns
    -------
    np.ndarray
        Signed edge array.
    """
    a_logic = gen_pulse_binarize(chan_a, threshold)
    b_logic = gen_pulse_binarize(chan_b, threshold)

    prev_a = gen_pulse_shift(a_logic)
    prev_b = gen_pulse_shift(b_logic)

    cw_edges = prev_b ^ a_logic  # A rising AFTER B ⇒ clockwise
    ccw_edges = prev_a ^ b_logic  # B rising AFTER A ⇒ counter‑clockwise

    return cw_edges.astype(int) - ccw_edges.astype(int)


def pulse_count(direction: np.ndarray) -> int:
    """Return total signed pulse count (quadrature ×4)."""
    return int(np.sum(direction))


def gen_quad_pulse(
    time: np.ndarray,
    direction: np.ndarray,
    width: float,
    amplitude: float,
    sampling_rate: float,
) -> np.ndarray:
    """Generate a rectangular pulse train for each edge in *direction*.

    Parameters
    ----------
    time : np.ndarray
        Global time axis (seconds).
    direction : np.ndarray
        Signed edge array produced by :func:`direction_from_quadrature`.
    width : float
        Pulse width in seconds.
    amplitude : float
        Pulse amplitude.
    sampling_rate : float
        Sampling frequency in Hz.

    Returns
    -------
    np.ndarray
        Pulse train aligned with *time*.
    """
    width_samples = int(width * sampling_rate)
    base_pulse = np.full(width_samples, amplitude)
    pulses = np.convolve(direction, base_pulse, mode="full")[: len(time)]
    return pulses

In [None]:
# ----------------------------------------------------  rectangular-wave helper
def gen_chunk() -> np.ndarray:
    """Return one chunk of Gaussian-distributed samples (μ=0, σ=1)."""
    return np.random.randn(N_SAMPLES).astype(np.float32)


def gen_chunk_pulse(
    t: np.ndarray,
    height: float = PULSE_HEIGHT,
    width: float = PULSE_WIDTH,
    duty: float = PULSE_DUTY,
    phase: float = PULSE_PHASE_A,
) -> np.ndarray:
    """Evaluate a rectangular wave at points t[*] and return float32 array."""
    mod = (t + phase) % width
    return np.where(mod < duty * width, height, 0.0).astype(np.float32)


# pre-compute one relative axis (0 – 0.2 s) to avoid rebuilding it every loop
REL_AXIS = np.arange(N_SAMPLES, dtype=np.float32) / SAMPLE_RATE


# ----------------------------------------------------  producer
def generator() -> None:
    """
    Push (t_axis, pulse_chunk) tuples into the queue.
    The time axis is shifted by +0.2 s on each iteration so that it grows:
        chunk #0: 0.000 … 0.199 999 s
        chunk #1: 0.200 … 0.399 999 s
        chunk #2: 0.400 … 0.599 999 s
        ...
    """
    chunk_idx = 0
    next_t = time.perf_counter()
    while not stop_ev.is_set():
        base = chunk_idx * CHUNK_SEC
        t_axis = REL_AXIS + base  # absolute time axis
        pulse_A = gen_chunk_pulse(t_axis, phase=PULSE_PHASE_A)  # rectangular wave
        pulse_B = gen_chunk_pulse(t_axis, phase=PULSE_PHASE_B)  # rectangular wave
        buf_q.put((t_axis, pulse_A, pulse_B), block=True)  # may block for back-pressure
        chunk_idx += 1
        next_t += CHUNK_SEC  # 0.2 s later
        rest = next_t - time.perf_counter()
        if rest > 0:
            time.sleep(rest)
        else:
            next_t = time.perf_counter()


# ----------------------------------------------------  consumer / processor
def process(t: np.ndarray, dA: np.ndarray, dB: np.ndarray) -> float:
    d_direction = gen_pulse_direction(dA, dB, threshold=THRESHOLD_DEFAULT)
    count = pulse_count(d_direction)
    d_quad = gen_quad_pulse(t, d_direction, QUADPULSE_WIDTH, PULSE_HEIGHT, SAMPLE_RATE)
    return count, d_quad


def consumer() -> None:
    """Pop chunks, process them, and report inter-chunk jitter in ms."""
    last_t = time.perf_counter()
    while not stop_ev.is_set():
        try:
            t, pulse_A, pulse_B = buf_q.get(timeout=1.0)
        except queue.Empty:
            continue

        now = time.perf_counter()
        jitter_ms = (now - last_t) * 1e3
        last_t = now

        count, d_quad = process(t, pulse_A, pulse_B)

        print(
            f"UNIX EPOCH = {now:.6f}  iter = {jitter_ms:6.2f} ms  "
            f"t = {t[0]:.3f} s ~ {t[-1]:.3f} s  count = {count}  "
        )

        del t, pulse_A, pulse_B  # garbage collect
        buf_q.task_done()


# ----------------------------------------------------  launch worker threads
gen_th = threading.Thread(target=generator, daemon=True)
con_th = threading.Thread(target=consumer, daemon=True)
gen_th.start()
con_th.start()

try:
    time.sleep(10)  # run for ~10 s
finally:
    stop_ev.set()
    gen_th.join()
    con_th.join()

UNIX EPOCH = 183847.193906  iter =   2.69 ms  t = 0.000 s ~ 0.200 s  count = -399  
UNIX EPOCH = 183847.393466  iter = 199.56 ms  t = 0.200 s ~ 0.400 s  count = -399  
UNIX EPOCH = 183847.594939  iter = 201.47 ms  t = 0.400 s ~ 0.600 s  count = -399  
UNIX EPOCH = 183847.794648  iter = 199.71 ms  t = 0.600 s ~ 0.800 s  count = -399  
UNIX EPOCH = 183847.993519  iter = 198.87 ms  t = 0.800 s ~ 1.000 s  count = -399  
UNIX EPOCH = 183848.193989  iter = 200.47 ms  t = 1.000 s ~ 1.200 s  count = -400  
UNIX EPOCH = 183848.393426  iter = 199.44 ms  t = 1.200 s ~ 1.400 s  count = -399  
UNIX EPOCH = 183848.594366  iter = 200.94 ms  t = 1.400 s ~ 1.600 s  count = -399  
UNIX EPOCH = 183848.800434  iter = 206.07 ms  t = 1.600 s ~ 1.800 s  count = -399  
UNIX EPOCH = 183848.994840  iter = 194.41 ms  t = 1.800 s ~ 2.000 s  count = -399  
UNIX EPOCH = 183849.199741  iter = 204.90 ms  t = 2.000 s ~ 2.200 s  count = -400  
UNIX EPOCH = 183849.398926  iter = 199.18 ms  t = 2.200 s ~ 2.400 s  count =

In [27]:
d_quad

NameError: name 'd_quad' is not defined