In [1]:
import numpy as np
import scipy.io as scio
import time
import onnxruntime
import adi

In [2]:
print(onnxruntime.get_device())
print(onnxruntime.get_available_providers())

GPU
['TensorrtExecutionProvider', 'CUDAExecutionProvider', 'CPUExecutionProvider']


In [3]:
def GenerateSymbolMat(MatFile,MatName):
    Symbol_file = scio.loadmat(MatFile)
    Symbol_batch = Symbol_file[MatName]
    _batch_size = Symbol_batch.shape[0]
    _signal_dimension = Symbol_batch.shape[1]
    _symbol_length = Symbol_batch.shape[2]
    _complex_symbol = np.iscomplexobj(Symbol_batch)
    if _complex_symbol:
        print("Input has complex symbols!")
        Symbol_batch_real = np.real(Symbol_batch)
        Symbol_batch_imag = np.imag(Symbol_batch)
        Symbol_batch_mat = np.concatenate((Symbol_batch_real,Symbol_batch_imag), axis = 1).astype('float32')
    else:
        print("Input has real symbols!")
        Symbol_batch_mat = Symbol_batch.astype('float32')
    # Symbol_batch_tensor = torch.tensor(Symbol_batch_mat)
    return Symbol_batch_mat, _complex_symbol, _batch_size, _signal_dimension, _symbol_length


# ZigBee
SymbolFilePath = './txSymbols.mat'
SymbolMat = 'txSymbols'

Symbol_file = scio.loadmat(SymbolFilePath)
Symbol_batch = Symbol_file[SymbolMat]


Data_Symbols, complex_symbol, batch_size, signal_dimension, symbol_length = GenerateSymbolMat(SymbolFilePath, SymbolMat)
print(Data_Symbols.shape)
print(batch_size)
print(signal_dimension)

Input has complex symbols!
(61, 128, 27)
61
64


In [4]:
# Execution Provider of ONNX backend
WiFi_session = onnxruntime.InferenceSession("WiFiMod.onnx",providers=[("CUDAExecutionProvider", {"cudnn_conv_use_max_workspace": "1", "cudnn_conv_algo_search": "DEFAULT"}),"CPUExecutionProvider"])

In [5]:
io_binding = WiFi_session.io_binding()
# OnnxRuntime will copy the data over to the CUDA device if 'input' is consumed by nodes on the CUDA device
io_binding.bind_cpu_input(WiFi_session.get_inputs()[0].name, Data_Symbols)
io_binding.bind_output(WiFi_session.get_outputs()[0].name)
WiFi_session.run_with_iobinding(io_binding)
ONNX_Output_array = io_binding.copy_outputs_to_cpu()[0]

In [6]:
# Post processing for complex signals
realSig = ONNX_Output_array[:,:,0]
imagSig = ONNX_Output_array[:,:,1]
cplxSig = realSig + 1j*imagSig
print(cplxSig.shape)
txWaveform = 0.3*cplxSig * 2**14 # Range mapping
print(txWaveform.shape)

(61, 2240)
(61, 2240)


In [7]:
sdr = adi.Pluto('ip:192.168.2.1') # or whatever your Pluto's IP is
centralFrequency = 2.462e9
samplingRate = 20e6
sdr.sample_rate = int(samplingRate) # sampling rate
sdr.tx_rf_bandwidth = int(samplingRate) # bandwidth
sdr.tx_lo = int(centralFrequency)
sdr.tx_hardwaregain_chan0 = 0 # Increase to increase tx power, valid range is -90 to 0 dB

In [10]:
numFrames = txWaveform.shape[0]
sdr.tx_destroy_buffer()
sdr.tx_cyclic_buffer = False
for frameIdx in range(numFrames):
    sdr.tx(txWaveform[frameIdx,:])
    print("Send Data Packet No.", frameIdx, "!")
    # if (frameIdx%10) == 0:
    #     print("Send Data Packet No.", frameIdx, "!")
    time.sleep(0.5)

Send Data Packet No. 0 !
Send Data Packet No. 10 !
Send Data Packet No. 20 !
Send Data Packet No. 30 !
Send Data Packet No. 40 !
Send Data Packet No. 50 !
Send Data Packet No. 60 !


In [None]:
onnxSig = cplxSig
scio.savemat('./WiFiMatlab/onnxSig.mat', {'onnxSig':onnxSig})

In [None]:
# This part is for test. 
# NNwifiWaveform is waveform samples 
# generated by ONNX model running on laptop.
Waveform_file = scio.loadmat("./txWaveSig.mat")
Waveform_mat = Waveform_file['txWaveSig']
numFrames = Waveform_mat.shape[0]
samples = 0.3*Waveform_mat[0,:]
samples *= 2**14
print(samples[:10])
print(txWaveform[0,:10])

In [None]:
# sdr.tx_destroy_buffer()
# sdr.tx_cyclic_buffer = True
# sdr.tx(samples)
sdr.tx_destroy_buffer()
sdr.tx_cyclic_buffer = False
for frameIdx in range(numFrames):
    sdr.tx(samples)
    print("Send Datapket!")
    time.sleep(0.1)