# VNA V1

November 19th. Goal: Implement an import version of the signal source code, be able to render a "mockup" output streaming data

In [1]:
import sys
import os
import importlib #
sys.path.append(os.path.abspath('../scripts'))
import ipywidgets as widgets
from IPython.display import display
import numpy as np 
from ipywidgets import Output

# Import functions from scripts
import sig_source
importlib.reload(sig_source)
from sig_source import SigSource

import numpy as np
import time
import threading
from bqplot import pyplot as plt
from threading import Thread
from scipy import signal

from pynq import PL
from pynq import allocate
import xrfdc
from pynq import Overlay
import pprint

In [34]:
rfsoc_button = widgets.Button(description="Update RFSOC Code")

out = Output()
def run_rfsoc(func):
    with out:
        out.clear_output
        try: 
            '''
            Code to run RFSOC by writing bitfile
            Placed inside this function to call only when on board
            '''
            #nothing! 
        except Exception as e:
            print(f"Error: {e}")

rfsoc_button.on_click(run_rfsoc)
display(widgets.VBox([widgets.Label(value="Update RFSOC Code"), rfsoc_button, out]))

PL.reset() #important fixes caching issues which have popped up.
ol = Overlay('./design_1.bit')  #locate/point to the bit file
#pprint.pprint(ol.ip_dict) #REMOVE PRINT STATEMENT
#dma_interfaces = [ol.axi_dma_0, ol.axi_dma_1, ol.axi_dma_2, ol.axi_dma_3] # 0 is ADC_D, 1 is ADC_C, 2 is ADC_B, 3 is ADC_A
dma_interface = ol.axi_dma_0
rf = ol.usp_rf_data_converter_0

VBox(children=(Label(value='Update RFSOC Code'), Button(description='Update RFSOC Code', style=ButtonStyle()),…

In [None]:
source = SigSource(start = 10000000, stop = 20000000000, resolution = 100)

start_stop_slider = widgets.FloatRangeSlider(
    value=[source.lowest_freq/(10**6), source.highest_freq/(10^6)],
    min= source.lowest_freq/(10**6),
    max= source.highest_freq/(10**6),
    step= 1,
    #description='Start Frequency (MHz)',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='1.0f',
    layout=widgets.Layout(width='500px') #Change layout width here!! 
)
center_slider = widgets.FloatSlider(value=source.center_freq/(10**6), min=source.lowest_freq/(10**6), max=source.highest_freq/(10**6), step=1)
span_slider = widgets.FloatSlider(value=((source.highest_freq - source.lowest_freq)/2)/10**6, min=1, max=((source.highest_freq - source.lowest_freq))/10**6, step=1)
resolution_slider = widgets.FloatSlider(value = 100, min = 0, max = 1000, step=1)
calc_selection = widgets.ToggleButtons(
    options=['Start-Stop', 'Center-Span'],
    description='Caculate Based On:',
    disabled=False,
    button_style=''
)
update_button = widgets.Button(description="Update Wave")

out = Output()
def update_function(change):
    with out:
        out.clear_output()
        try:
            if (calc_selection.value == "Start-Stop"):
                source.update_parameters(start = start_stop_slider.value[0]*10**6, 
                                        stop = start_stop_slider.value[1]*10**6, 
                                        resolution = resolution_slider.value)
            elif (calc_selection.value == "Center-Span"):
                source.update_parameters(center = center_slider.value*10**6,
                                        span = span_slider.value*10**6, 
                                        resolution = resolution_slider.value)
            
            start_stop_slider.value = (source.start/10**6, source.stop/10**6)
            center_slider.value = source.center/10**6
            span_slider.value = source.span/10**6
            #print("Start frequency: {} Stop Frequency: {}".format(start_stop_slider.value[0], start_stop_slider.value[1]) )
        except Exception as e:
            print(f"Error: {e}")


b1 = widgets.VBox([widgets.VBox([widgets.Label(value="Start Stop Slider (MHz)"), start_stop_slider]), 
                   widgets.HBox([widgets.VBox([widgets.Label(value="Center Slider (Mhz)"),center_slider]), 
                                 widgets.VBox([widgets.Label("Span Slider (Hz)"), span_slider])]),
                    widgets.VBox([widgets.Label(value="Points per Sweep"), resolution_slider]),
                    calc_selection,
                    update_button,
                    out],
                    layout=widgets.Layout(padding='20px 0'))

update_button.on_click(update_function)
display(b1)

In [None]:
print(source.generate_freq_points())

In [11]:
# Sampling frequency
fs = 147.456e6
# Number of samples
n = 65536
T = n/fs

In [None]:
def read_dma():
    # Trigger the DMA transfer and wait for the result
    out_buffer = allocate(400024 * 4, dtype=np.int32)
    # Trigger the DMA transfer and wait for the result
    start_time = time.time()
    dma_interface.recvchannel.transfer(out_buffer)
    dma_interface.recvchannel.wait()
    stop_time = time.time()
    hw_exec_time = stop_time-start_time
    print('Hardware execution time: ',hw_exec_time)

In [None]:
def iq_break_data(in_data):
    val = in_data&0xFFFF
    if val >= 32768:
        real = np.int32(0xFFFF0000|val)
    else:
        real = val
    imag = in_data>>16
    return [real, imag]

In [33]:
# Initialize data lists
time_data = np.linspace(0, T, n)  # Time data (X-axis)
x_axis = [time_data, time_data, time_data, time_data]
# Initialize the plot
fig = plt.figure(title="Real-time Sensor Data", animation_duration=0)
line = plt.plot([], [], colors=["blue", "red", "green", "orange"])  # Initial empty plot

plt.xlim(0, 3e-6)  # Initial X-axis range, will update dynamically
plt.xlabel("Time [s]")


# Function to update the plot with new sensor data
def update_plot():
    while True:
        if is_running.value:
            out_buffer = allocate(400024 * 4, dtype=np.int32)
            # Trigger the DMA transfer and wait for the result
            start_time = time.time()
            dma_interface.recvchannel.transfer(out_buffer)
            dma_interface.recvchannel.wait()
            stop_time = time.time()
            hw_exec_time = stop_time-start_time
            print('Hardware execution time: ',hw_exec_time)
            
            start_time = time.time()
#             out_buffer_real = [[], [], [], []]
#             out_buffer_imag = [[], [], [], []]
#             for i in range(0, len(out_buffer), 4):
#                 [real0, imag0] = iq_break_data(out_buffer[i])
#                 [real1, imag1] = iq_break_data(out_buffer[i + 1])
#                 [real2, imag2] = iq_break_data(out_buffer[i+2])
#                 [real3, imag3] = iq_break_data(out_buffer[i+3])
#                 out_buffer_real[0].append(real0)
#                 out_buffer_real[1].append(real1)
#                 out_buffer_real[2].append(real2)
#                 out_buffer_real[3].append(real3)
#                 out_buffer_imag[0].append(real0)
#                 out_buffer_imag[1].append(real1)
#                 out_buffer_imag[2].append(real2)
#                 out_buffer_imag[3].append(real3)
#             stop_time = time.time()
#             hw_exec_time = stop_time-start_time
            
            actual_output = 2000 #max is len(out_buffer)
            out_buffer0 = []
            out_buffer1 = []
            out_buffer2 = []
            out_buffer3 = []
            for i in range(0, actual_output, 4):
                out_buffer0.append(out_buffer[i])
                out_buffer1.append(out_buffer[i + 1])
                out_buffer2.append(out_buffer[i + 2])
                out_buffer3.append(out_buffer[i + 3])

            #Number of datapoints
            numpoints = 500 #max is 65536
            real0 = []
            imag0 = []
            #extract the two values (I and Q) from each 32 bit write from the hardware side.
            for i in range(numpoints):
                val = out_buffer0[i]&0xFFFF
                if val >= 32768:
                    real0.append(np.int32(0xFFFF0000|val))
                else:
                    real0.append(val)
                imag0.append((out_buffer0[i]>>16))

            real1 = []
            imag1 = []
            #extract the two values (I and Q) from each 32 bit write from the hardware side.
            for i in range(numpoints):
                val = out_buffer1[i]&0xFFFF
                if val >= 32768:
                    real1.append(np.int32(0xFFFF0000|val))
                else:
                    real1.append(val)
                imag1.append((out_buffer1[i]>>16))

            real2 = []
            imag2 = []
            #extract the two values (I and Q) from each 32 bit write from the hardware side.
            for i in range(numpoints):
                val = out_buffer2[i]&0xFFFF
                if val >= 32768:
                    real2.append(np.int32(0xFFFF0000|val))
                else:
                    real2.append(val)
                imag2.append((out_buffer2[i]>>16))

            real3 = []
            imag3 = []
            #extract the two values (I and Q) from each 32 bit write from the hardware side.
            for i in range(numpoints):
                val = out_buffer3[i]&0xFFFF
                if val >= 32768:
                    real3.append(np.int32(0xFFFF0000|val))
                else:
                    real3.append(val)
                imag3.append((out_buffer3[i]>>16))

            #c_data = np.array(real) + 1j*np.array(imag)
            #z = np.fft.fftshift(np.fft.fft(c_data,n))
            #plot_fft(ns,abs(z),65535)
            print('Extract and translate data: ',hw_exec_time)

            start_time = time.time()
            # Update the plot with new data
            line.x = x_axis
            line.y = [real0, real1, real2, real3]
            stop_time = time.time()
            hw_exec_time = stop_time-start_time
            print('Plot Time: ',hw_exec_time)
            

# Toggle button to start/stop the plot
is_running = widgets.ToggleButton(
    value=True,
    description="Running",
    icon="play",
    tooltip="Start/Stop the live plot",
)

# Display the toggle button and plot
display(is_running, fig)

# Function to start the thread for continuous plotting
def start_plot(change):
    if is_running.value:
        # Run the update function in a separate thread to avoid blocking the main thread
        thread = Thread(target=update_plot, daemon=True)
        thread.start()

# Watch the button and start the plot when pressed
is_running.observe(start_plot, names='value')

ToggleButton(value=True, description='Running', icon='play', tooltip='Start/Stop the live plot')

Figure(axes=[Axis(label='Time [s]', scale=LinearScale(max=3e-06, min=0.0)), Axis(orientation='vertical', scale…

Hardware execution time:  0.0012035369873046875
Extract and translate data:  0.0012035369873046875
Plot Time:  0.06237053871154785
Hardware execution time:  0.011342763900756836
Extract and translate data:  0.011342763900756836
Plot Time:  0.06245899200439453
Hardware execution time:  0.03659844398498535
Extract and translate data:  0.03659844398498535
Plot Time:  0.07152986526489258
Hardware execution time:  0.01118612289428711
Extract and translate data:  0.01623082160949707
Extract and translate data:  0.01118612289428711
Plot Time:  0.0841665267944336
Plot Time:  0.051526546478271484
Hardware execution time:  0.006117820739746094
Hardware execution time:  0.0060367584228515625
Extract and translate data:  0.0060367584228515625
Plot Time:  0.06645607948303223
Hardware execution time:  0.026541948318481445
Extract and translate data:  0.026541948318481445
Plot Time:  0.11676549911499023
Hardware execution time:  0.026430130004882812
Extract and translate data:  0.026430130004882812
P

In [28]:
#RANDOM STUFF BELOW LOL 

In [3]:

out_buffer = allocate(400024 * 4, dtype=np.int32)
# Trigger the DMA transfer and wait for the result
start_time = time.time()
dma_interface.recvchannel.transfer(out_buffer)
dma_interface.recvchannel.wait()
stop_time = time.time()
hw_exec_time = stop_time-start_time
print('Hardware execution time: ',hw_exec_time)

start_time = time.time()
#             out_buffer_real = [[], [], [], []]
#             out_buffer_imag = [[], [], [], []]
#             for i in range(0, len(out_buffer), 4):
#                 [real0, imag0] = iq_break_data(out_buffer[i])
#                 [real1, imag1] = iq_break_data(out_buffer[i + 1])
#                 [real2, imag2] = iq_break_data(out_buffer[i+2])
#                 [real3, imag3] = iq_break_data(out_buffer[i+3])
#                 out_buffer_real[0].append(real0)
#                 out_buffer_real[1].append(real1)
#                 out_buffer_real[2].append(real2)
#                 out_buffer_real[3].append(real3)
#                 out_buffer_imag[0].append(real0)
#                 out_buffer_imag[1].append(real1)
#                 out_buffer_imag[2].append(real2)
#                 out_buffer_imag[3].append(real3)
#             stop_time = time.time()
#             hw_exec_time = stop_time-start_time
out_buffer0 = []
out_buffer1 = []
out_buffer2 = []
out_buffer3 = []
for i in range(0, len(out_buffer), 4):
    out_buffer0.append(out_buffer[i])
    out_buffer1.append(out_buffer[i + 1])
    out_buffer2.append(out_buffer[i + 2])
    out_buffer3.append(out_buffer[i + 3])

real0 = []
imag0 = []
#extract the two values (I and Q) from each 32 bit write from the hardware side.
for i in range(65536):
    val = out_buffer0[i]&0xFFFF
    if val >= 32768:
        real0.append(np.int32(0xFFFF0000|val))
    else:
        real0.append(val)
    imag0.append((out_buffer0[i]>>16))

real1 = []
imag1 = []
#extract the two values (I and Q) from each 32 bit write from the hardware side.
for i in range(65536):
    val = out_buffer1[i]&0xFFFF
    if val >= 32768:
        real1.append(np.int32(0xFFFF0000|val))
    else:
        real1.append(val)
    imag1.append((out_buffer1[i]>>16))

real2 = []
imag2 = []
#extract the two values (I and Q) from each 32 bit write from the hardware side.
for i in range(65536):
    val = out_buffer2[i]&0xFFFF
    if val >= 32768:
        real2.append(np.int32(0xFFFF0000|val))
    else:
        real2.append(val)
    imag2.append((out_buffer2[i]>>16))

real3 = []
imag3 = []
#extract the two values (I and Q) from each 32 bit write from the hardware side.
for i in range(65536):
    val = out_buffer3[i]&0xFFFF
    if val >= 32768:
        real3.append(np.int32(0xFFFF0000|val))
    else:
        real3.append(val)
    imag3.append((out_buffer3[i]>>16))

#c_data = np.array(real) + 1j*np.array(imag)
#z = np.fft.fftshift(np.fft.fft(c_data,n))
#plot_fft(ns,abs(z),65535)
stop_time = time.time()
hw_exec_time = stop_time-start_time
print('Extract and translate data: ',hw_exec_time)


Hardware execution time:  0.0014662742614746094
Extract and translate data:  0.0014662742614746094


In [24]:
def test_update_plot(time_data, real0, real1, real2, real3):
    line.x = [time_data, time_data, time_data, time_data]
    line.y = [real0, real1, real2, real3]

In [25]:
fig = plt.figure()

time_data = np.linspace(0, T, n)
x = time_data
y = [real0, real1, real2, real3]
line = plt.plot(x, y)
plt.xlim(0, 1e-6)

fig

Figure(axes=[Axis(scale=LinearScale(max=1e-06, min=0.0)), Axis(orientation='vertical', scale=LinearScale())], …

In [26]:
# Initialize data lists
time_data = np.linspace(0, T, n)  # Time data (X-axis)
# Initialize the plot
fig = plt.figure(title="Real-time Sensor Data", animation_duration=0)
line = plt.plot([], [], colors=["blue", "red", "green", "orange"])  # Initial empty plot

plt.xlim(0, 3e-6)  # Initial X-axis range, will update dynamically
plt.xlabel("Time [s]")

# Toggle button to start/stop the plot
is_running = widgets.ToggleButton(
    value=True,
    description="Running",
    icon="play",
    tooltip="Start/Stop the live plot",
)

# Display the toggle button and plot
display(is_running, fig)

# Function to start the thread for continuous plotting
def start_plot(change):
    if is_running.value:
        # Run the update function in a separate thread to avoid blocking the main thread
        thread = Thread(target=test_update_plot(time_data, real0, real2, real3, real1), daemon=True)
        thread.start()

# Watch the button and start the plot when pressed
is_running.observe(start_plot, names='value')

ToggleButton(value=True, description='Running', icon='play', tooltip='Start/Stop the live plot')

Figure(axes=[Axis(label='Time [s]', scale=LinearScale(max=3e-06, min=0.0)), Axis(orientation='vertical', scale…