# FFT_IFFT_SIGNAL_RECOVER


## Import the libraries

In [1]:
from scipy.io import wavfile
import numpy as np

fs, aud_in = wavfile.read("./birds.wav")

import plotly_express as px
import pandas as pd

# Derive a custom plotting template from `plotly_dark`
import plotly.io as pio
new_template = pio.templates['plotly_white']
new_template.update(dict(layout = dict(
        width         = 800,
        autosize      = False,
        legend        = dict(x=1.1),
        paper_bgcolor = 'rgb(0,0,0,0)',
        plot_bgcolor  = 'rgb(0,0,0,0)',
)))

# Register new template as the default
pio.templates['light_plot'] = new_template
pio.templates.default = 'light_plot'
def to_freq_dataframe(samples, fs):
    """Create a pandas dataframe from an ndarray frequency domain samples"""
    sample_freqs = np.linspace(0, fs, len(samples))
    return pd.DataFrame(dict(
        amplitude = samples[0:int(len(samples)/2)],  
        freq      = sample_freqs[0:int(len(samples)/2)]
    ))

def to_time_dataframe(samples, fs):
    """Create a pandas dataframe from an ndarray of 16-bit time domain samples"""
    num_samples = len(samples)
    sample_times = np.linspace(0, num_samples/fs, num_samples)
    normalised_samples = samples / 2**15
    return pd.DataFrame(dict(
        amplitude = normalised_samples,  
        time      = sample_times
    ))
# Derive a custom plotting template from `plotly_dark`
import plotly.io as pio
new_template = pio.templates['plotly_white']
new_template.update(dict(layout = dict(
        width         = 800,
        autosize      = False,
        legend        = dict(x=1.1),
        paper_bgcolor = 'rgb(0,0,0,0)',
        plot_bgcolor  = 'rgb(0,0,0,0)',
)))

# Register new template as the default
pio.templates['light_plot'] = new_template
pio.templates.default = 'light_plot'

from pynq import Overlay
ol = Overlay('./dsp_pynq.bit')

audio_test=aud_in[np.int16(fs*0.3):np.int16(fs*0.671)]

In [2]:
from pynq import allocate

def get_config_value(forwards, scaling_sched):
    val = 0
    for scaling in scaling_sched:     # [14:1] = scaling schedule
        val = (val << 2) + scaling
    return (val << 1) + int(forwards) # [0] = direction

## define the function fft and ifft

In [3]:
def fft_hw(signal, NFFT):
    # calculate how many NFFT frames are needed to iterate through entire signal
    max_iters = np.int16(np.ceil(len(signal)/NFFT))
    # calculate amount of zeros to add to make up to NFFT multiple
    zeros = np.int16(np.ceil(len(signal)/NFFT))*NFFT - len(signal)
    # increase length to multiple of NFFT
    signal = np.int32(np.append(signal, np.zeros(zeros)))
    
    fft_in_buffer = allocate(shape=(NFFT,),dtype=np.int32)
    fft_out_buffer = allocate(shape=(NFFT*2,),dtype=np.int16)
    
    fft_out = np.zeros(len(fft_out_buffer))
    
    for i in range(0,max_iters):
        np.copyto(fft_in_buffer,signal[NFFT*i:(NFFT*(i+1))])
        
        fft_data.sendchannel.transfer(fft_in_buffer)
        fft_data.recvchannel.transfer(fft_out_buffer)
        
        fft_data.sendchannel.wait()
        fft_data.recvchannel.wait()
        
        fft_out = fft_out + np.array(fft_out_buffer)
        
    fft_out_buffer.close()
    fft_in_buffer.close()
        
    return fft_out

In [4]:
def ifft_hw(signal, NFFT):
    # calculate how many NFFT frames are needed to iterate through entire signal
    #max_iters = np.int16(np.ceil(len(signal)/NFFT))
    # calculate amount of zeros to add to make up to NFFT multiple
    #zeros = np.int16(np.ceil(len(signal)/NFFT))*NFFT - len(signal)
    # increase length to multiple of NFFT
    #signal = np.int16(np.append(signal, np.zeros(zeros)))
    
    signal = np.int16(signal)
    
    fft_in_buffer = allocate(shape=(NFFT*2,),dtype=np.int16)
    fft_out_buffer = allocate(shape=(NFFT*2,),dtype=np.int16)
    
    #fft_out = np.int16(np.zeros(len(fft_out_buffer)))
    

    np.copyto(fft_in_buffer,signal)

    fft_data.sendchannel.transfer(fft_in_buffer)
    fft_data.recvchannel.transfer(fft_out_buffer)

    fft_data.sendchannel.wait()
    fft_data.recvchannel.wait()

    #fft_out = fft_out + np.array(fft_out_buffer)
        
    fft_out_buffer.close()
    fft_in_buffer.close()
        
    return fft_out_buffer

## Import wave for testing

In [5]:
fs, aud_in = wavfile.read("./birds.wav")

total_len=len(aud_in)
NFFT=2**14
aud_out=np.int16(np.zeros(total_len*2))

## Overlay

In [6]:
from pynq import allocate

from pynq import Overlay
ol = Overlay('./dsp_pynq.bit')
def get_config_value(forwards, scaling_sched):
    val = 0
    for scaling in scaling_sched:     # [14:1] = scaling schedule
        val = (val << 2) + scaling
    return (val << 1) + int(forwards) # [0] = direction


## FFT and IFFT

In [7]:
for ii in range(0,np.int16(total_len/ NFFT)):
    fft_data = ol.fft_top.fft_data
    fft_config = ol.fft_top.fft_config
    config_value = get_config_value(True, [1, 1, 2, 2, 2, 2, 2])
    fft_buffer_config = allocate(shape=(1,),dtype=np.int16)
    fft_buffer_config[0] = config_value
    fft_config.sendchannel.transfer(fft_buffer_config)
    fft_config.sendchannel.wait()
    fft_config.sendchannel.wait()
    aud_hw = np.asarray(aud_in[NFFT*ii:NFFT*(ii+1)],np.int32)
    aud_fft = fft_hw(aud_hw[::],NFFT)
    
    for jj in range(0,np.int16(NFFT)*2):
        if jj>1040*2 and jj<15343*2:
            aud_fft[jj]=0
    
    ifft_data = ol.fft_top.fft_data
    ifft_config = ol.fft_top.fft_config
    config_value = get_config_value(False, [0, 0, 0, 0, 0, 0, 2])

    ifft_buffer_config = allocate(shape=(1,),dtype=np.int16)

    ifft_buffer_config[0] = config_value

    ifft_config.sendchannel.transfer(ifft_buffer_config)
    ifft_config.sendchannel.wait()
    ifft_config.sendchannel.wait()
    aud_ifft = ifft_hw(aud_fft,NFFT)
    aud_out[NFFT*ii*2:NFFT*(ii*2+2)]=aud_ifft[::1]
    

## Get the results for one bird

In [8]:
from IPython.display import Audio
scaled = np.int16(aud_out[::2]/np.max(abs(aud_out[::2])) * 2**15 - 1)
wavfile.write('ifft.wav', fs, scaled)
Audio('ifft.wav')

In [9]:
fft_data = ol.fft_top.fft_data
fft_config = ol.fft_top.fft_config
config_value = get_config_value(True, [2, 2, 2, 2, 2, 2, 2])
fft_buffer_config = allocate(shape=(1,),dtype=np.int16)
fft_buffer_config[0] = config_value
fft_config.sendchannel.transfer(fft_buffer_config)
fft_config.sendchannel.wait()
fft_config.sendchannel.wait()
aud_fft = fft_hw(np.int32(aud_out[::2]),NFFT)
# make complex number x[n] + j*x[n+1]
aud_fft_c = np.int16(aud_fft[0::2])+1j*np.int16(aud_fft[1::2])
aud_fft_abs = np.abs(aud_fft_c)
# Plot FFT

px.line(
    to_freq_dataframe(aud_fft_abs, fs),
    x='freq', y='amplitude',
    labels = dict(amplitude='Amplitude', freq='Freq (Hz)'),
    template='light_plot'
)

## Testing for anthor bird

In [10]:
for ii in range(0,np.int16(total_len/ NFFT)):
    fft_data = ol.fft_top.fft_data
    fft_config = ol.fft_top.fft_config
    config_value = get_config_value(True, [1, 1, 2, 2, 2, 2, 2])
    fft_buffer_config = allocate(shape=(1,),dtype=np.int16)
    fft_buffer_config[0] = config_value
    fft_config.sendchannel.transfer(fft_buffer_config)
    fft_config.sendchannel.wait()
    fft_config.sendchannel.wait()
    aud_hw = np.asarray(aud_in[NFFT*ii:NFFT*(ii+1)],np.int32)
    aud_fft = fft_hw(aud_hw[::],NFFT)
    
    for jj in range(0,np.int16(NFFT)*2):
        if jj<1040*2 or jj>15343*2:
            aud_fft[jj]=0
    
    ifft_data = ol.fft_top.fft_data
    ifft_config = ol.fft_top.fft_config
    config_value = get_config_value(False, [0, 0, 0, 0, 0, 0, 2])

    ifft_buffer_config = allocate(shape=(1,),dtype=np.int16)

    ifft_buffer_config[0] = config_value

    ifft_config.sendchannel.transfer(ifft_buffer_config)
    ifft_config.sendchannel.wait()
    ifft_config.sendchannel.wait()
    aud_ifft = ifft_hw(aud_fft,NFFT)
    aud_out[NFFT*ii*2:NFFT*(ii*2+2)]=aud_ifft[::1]
    

In [11]:
from IPython.display import Audio
scaled = np.int16(aud_out[::2]/np.max(abs(aud_out[::2])) * 2**15 - 1)
wavfile.write('ifft.wav', fs, scaled)
Audio('ifft.wav')

In [12]:
fft_data = ol.fft_top.fft_data
fft_config = ol.fft_top.fft_config
config_value = get_config_value(True, [2, 2, 2, 2, 2, 2, 2])
fft_buffer_config = allocate(shape=(1,),dtype=np.int16)
fft_buffer_config[0] = config_value
fft_config.sendchannel.transfer(fft_buffer_config)
fft_config.sendchannel.wait()
fft_config.sendchannel.wait()
aud_fft = fft_hw(np.int32(aud_out[::2]),NFFT)
# make complex number x[n] + j*x[n+1]
aud_fft_c = np.int16(aud_fft[0::2])+1j*np.int16(aud_fft[1::2])
aud_fft_abs = np.abs(aud_fft_c)
# Plot FFT

px.line(
    to_freq_dataframe(aud_fft_abs, fs),
    x='freq', y='amplitude',
    labels = dict(amplitude='Amplitude', freq='Freq (Hz)'),
    template='light_plot'
)

## FFT for the two birds

In [13]:

fft_data = ol.fft_top.fft_data
fft_config = ol.fft_top.fft_config
config_value = get_config_value(True, [1, 1, 2, 2, 2, 2, 2])
fft_buffer_config = allocate(shape=(1,),dtype=np.int16)
fft_buffer_config[0] = config_value
fft_config.sendchannel.transfer(fft_buffer_config)
fft_config.sendchannel.wait()
fft_config.sendchannel.wait()
aud_fft = fft_hw(audio_test,NFFT)
# make complex number x[n] + j*x[n+1]
aud_fft_c = np.int16(aud_fft[0::2])+1j*np.int16(aud_fft[1::2])
aud_fft_abs = np.abs(aud_fft_c)
# Plot FFT

px.line(
    to_freq_dataframe(aud_fft_abs, fs),
    x='freq', y='amplitude',
    labels = dict(amplitude='Amplitude', freq='Freq (Hz)'),
    template='light_plot'
)