# CVBS Video Capture

This notebook captures 2 seconds of CVBS (composite video) signal using Deep Memory Acquisition (DMA).

## Configuration
- **Target sample rate:** 4fsc = 14.3181818 MS/s (NTSC standard)
- **Actual sample rate:** 15.625 MS/s (Dec 8, closest available)
- **Capture duration:** 2 seconds
- **Expected data size:** ~62.5 MB

## Hardware Setup
- Set input jumpers to **LV** (±1V range) for CVBS signals
- Connect CVBS source to **IN1** (RF input 1)
- Consider 75Ω termination for proper impedance matching

## 1. Initialize Libraries and FPGA

In [None]:
import time
import numpy as np
from matplotlib import pyplot as plt
from rp_overlay import overlay
import rp

# Initialize FPGA overlay and RP library
fpga = overlay()
rp.rp_Init()
print("Red Pitaya initialized")

## 2. Capture Parameters

In [None]:
# Capture configuration
CAPTURE_DURATION_SEC = 2.0
DECIMATION = rp.RP_DEC_8          # 125 MS/s / 8 = 15.625 MS/s
SAMPLE_RATE = 125e6 / 8           # 15.625 MS/s
TARGET_4FSC = 14.3181818e6        # 4x NTSC color subcarrier

# Calculate buffer size
DATA_SIZE = int(CAPTURE_DURATION_SEC * SAMPLE_RATE)
READ_CHUNK_SIZE = 1024 * 256      # Read in 256K sample chunks

print(f"Capture duration: {CAPTURE_DURATION_SEC} seconds")
print(f"Sample rate: {SAMPLE_RATE/1e6:.3f} MS/s (target 4fsc: {TARGET_4FSC/1e6:.3f} MS/s)")
print(f"Sample rate error: {((SAMPLE_RATE - TARGET_4FSC) / TARGET_4FSC * 100):+.2f}%")
print(f"Total samples: {DATA_SIZE:,}")
print(f"Data size: {DATA_SIZE * 2 / 1024 / 1024:.1f} MB")

## 3. Check DMA Memory Region

In [None]:
# Get available DMA memory region
memoryRegion = rp.rp_AcqAxiGetMemoryRegion()
g_adc_axi_start = memoryRegion[1]
g_adc_axi_size = memoryRegion[2]

print(f"DMA Memory Region:")
print(f"  Start address: 0x{g_adc_axi_start:08X}")
print(f"  Size (bytes): {g_adc_axi_size:,} ({g_adc_axi_size / 1024 / 1024:.1f} MB)")
print(f"  Max samples (16-bit): {g_adc_axi_size // 2:,}")

# Verify we have enough memory
required_bytes = DATA_SIZE * 2  # 16-bit samples
if required_bytes > g_adc_axi_size:
    print(f"\nWARNING: Required {required_bytes:,} bytes but only {g_adc_axi_size:,} available!")
    print("Reducing capture duration...")
    DATA_SIZE = g_adc_axi_size // 2
    CAPTURE_DURATION_SEC = DATA_SIZE / SAMPLE_RATE
    print(f"New capture duration: {CAPTURE_DURATION_SEC:.2f} seconds")
else:
    print(f"\nMemory OK: Need {required_bytes/1024/1024:.1f} MB, have {g_adc_axi_size/1024/1024:.1f} MB")

## 4. Configure and Start DMA Acquisition

In [None]:
# Reset acquisition
rp.rp_AcqReset()

# Configure DMA
rp.rp_AcqAxiSetDecimationFactor(DECIMATION)
print(f"Decimation set to: {DECIMATION}")

# Set trigger delay (capture DATA_SIZE samples after trigger)
rp.rp_AcqAxiSetTriggerDelay(rp.RP_CH_1, DATA_SIZE)

# Configure buffer - use channel 1 only, full memory region
rp.rp_AcqAxiSetBufferSamples(rp.RP_CH_1, g_adc_axi_start, DATA_SIZE)
print(f"Buffer configured: {DATA_SIZE:,} samples at 0x{g_adc_axi_start:08X}")

# Enable DMA on channel 1
rp.rp_AcqAxiEnable(rp.RP_CH_1, True)
print("DMA enabled on CH1")

In [None]:
# Start acquisition
print("Starting acquisition...")
rp.rp_AcqStart()

# Brief delay to let acquisition stabilize
time.sleep(0.1)

# Trigger immediately (RP_TRIG_SRC_NOW)
print("Triggering NOW...")
rp.rp_AcqSetTriggerSrc(rp.RP_TRIG_SRC_NOW)

# Wait for trigger to occur
while True:
    trig_state = rp.rp_AcqGetTriggerState()[1]
    if trig_state == rp.RP_TRIG_STATE_TRIGGERED:
        print("Triggered!")
        break

# Wait for buffer to fill
print(f"Capturing {CAPTURE_DURATION_SEC} seconds of data...")
start_time = time.time()

while True:
    fillState = rp.rp_AcqAxiGetBufferFillState(rp.RP_CH_1)[1]
    if fillState:
        break
    elapsed = time.time() - start_time
    if elapsed > CAPTURE_DURATION_SEC + 5:  # Timeout after expected duration + 5s
        print(f"WARNING: Timeout after {elapsed:.1f}s")
        break

capture_time = time.time() - start_time
print(f"Buffer filled in {capture_time:.2f} seconds")

# Stop acquisition
rp.rp_AcqStop()
print("Acquisition stopped")

## 5. Read Data from DMA Buffer

In [None]:
# Get write pointer at trigger
posChA = rp.rp_AcqAxiGetWritePointerAtTrig(rp.RP_CH_1)[1]
print(f"Write pointer at trigger: {posChA}")

# Allocate numpy array for all data
print(f"Allocating {DATA_SIZE * 2 / 1024 / 1024:.1f} MB for data...")
all_data = np.zeros(DATA_SIZE, dtype=np.int16)

# Read data using fast numpy-direct function
print("Reading data from DMA buffer (fast method)...")
read_start = time.time()

# Use rp_AcqAxiGetDataRawNP for direct numpy transfer (MUCH faster)
# This function writes directly to the numpy array without Python loop overhead
try:
    rp.rp_AcqAxiGetDataRawNP(rp.RP_CH_1, posChA, all_data)
    print("Used fast rp_AcqAxiGetDataRawNP method")
except AttributeError:
    # Fallback to chunked read if NP function not available (older firmware)
    print("Fast NP method not available, using chunked read...")
    READ_CHUNK_SIZE = 1024 * 1024  # 1M samples per chunk
    buff = rp.i16Buffer(READ_CHUNK_SIZE)
    read_pos = posChA
    samples_read = 0

    while samples_read < DATA_SIZE:
        chunk_size = min(READ_CHUNK_SIZE, DATA_SIZE - samples_read)
        rp.rp_AcqAxiGetDataRaw(rp.RP_CH_1, read_pos, chunk_size, buff.cast())

        # Use numpy array slicing instead of element-by-element copy
        for i in range(chunk_size):
            all_data[samples_read + i] = buff[i]

        read_pos += chunk_size
        samples_read += chunk_size
        pct = samples_read / DATA_SIZE * 100
        print(f"  {pct:.0f}% ({samples_read:,} samples)")

read_time = time.time() - read_start
print(f"Data read complete: {DATA_SIZE:,} samples in {read_time:.1f}s")
print(f"Read speed: {DATA_SIZE * 2 / read_time / 1024 / 1024:.1f} MB/s")

## 6. Save Data to File

In [None]:
# Save as numpy file
output_filename = f"/tmp/cvbs_capture_{int(time.time())}.npy"

print(f"Saving to {output_filename}...")
np.save(output_filename, all_data)

# Also save metadata
metadata = {
    'sample_rate': SAMPLE_RATE,
    'target_sample_rate_4fsc': TARGET_4FSC,
    'decimation': 8,
    'capture_duration_sec': CAPTURE_DURATION_SEC,
    'num_samples': DATA_SIZE,
    'timestamp': time.time(),
    'dtype': 'int16',
    'bits': 14,
}
metadata_filename = output_filename.replace('.npy', '_metadata.npy')
np.save(metadata_filename, metadata)

print(f"Data saved: {output_filename}")
print(f"Metadata saved: {metadata_filename}")
print(f"\nTo retrieve via SCP:")
print(f"  scp root@192.168.1.169:{output_filename} .")

## 7. Preview Captured Data

In [None]:
# Plot a small segment of the data (first ~1ms = one video line)
samples_per_line = int(63.5e-6 * SAMPLE_RATE)  # ~63.5µs per NTSC line
preview_samples = samples_per_line * 16  # Show 16 lines

fig, axes = plt.subplots(2, 1, figsize=(12, 8))

# Time axis in microseconds
t_us = np.arange(preview_samples) / SAMPLE_RATE * 1e6

# First plot: First 16 lines
axes[0].plot(t_us, all_data[:preview_samples])
axes[0].set_xlabel('Time (µs)')
axes[0].set_ylabel('ADC Value (raw)')
axes[0].set_title(f'First {preview_samples} samples (~16 video lines)')
axes[0].grid(True, alpha=0.3)

# Second plot: Full capture overview (decimated for display)
display_points = 10000
decimate_factor = max(1, len(all_data) // display_points)
t_ms = np.arange(0, len(all_data), decimate_factor) / SAMPLE_RATE * 1000
axes[1].plot(t_ms, all_data[::decimate_factor])
axes[1].set_xlabel('Time (ms)')
axes[1].set_ylabel('ADC Value (raw)')
axes[1].set_title(f'Full capture overview ({CAPTURE_DURATION_SEC}s, decimated {decimate_factor}x for display)')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Statistics
print(f"\nData Statistics:")
print(f"  Min: {all_data.min()}")
print(f"  Max: {all_data.max()}")
print(f"  Mean: {all_data.mean():.1f}")
print(f"  Std: {all_data.std():.1f}")

## 8. Cleanup

In [None]:
# Disable DMA and release resources
rp.rp_AcqAxiEnable(rp.RP_CH_1, False)
rp.rp_Release()
print("Resources released")