### Filter Class Code
Describes filter-type object, which contains filter coefficients for low, mid, or high frequency filter, or no coefficients for unspecified filter.

In [5]:
from scipy.signal import kaiserord, firwin, freqz
from pylab import figure, plot, title, grid, xlabel, ylabel
import numpy as np

class Filter:
    def __init__(self, filter_type='none', num_coefs=99, nyquist_rate=22050, stop_band_attentuation_dB=60, low_transition_freq=400, high_transition_freq=4000):
        """
        __init__ Initializes filter parameters based on passed values

        Args:
            filter_type (str, optional): 'Low', 'mid', or 'high' to specify what type of filter to generate
                    Defaults to 'empty', indicating that no coefs should be generated
            num_coefs (int, optional): Number of filter coefficients to generate
                    Defaults to 99, keep this unless HLS code is changed as well
            nyquist_rate (int, optional): Nyquist rate of signal to filter
                    Defaults to 22050 for the sample audio file "elecric_guitar_sample.wav"
                    #TODO: Implement dynamic nyquist rate (out of scope for now)
            stop_band_attentuation_dB (int, optional): Attenuation frequency for filters
                    Defaults to 60dB
            low_transition_freq (int, optional): Frequency defining the intersection point of the low and mid frequency bands
                    Defaults to 400Hz
            high_transition_freq (int, optional): Frequency defining the intersection point of the mid and high frequency bands
                    Defaults to 4000Hz
        """
        self.filter_type = filter_type
        self.num_coefs = num_coefs
        self.nyquist_rate = nyquist_rate
        self.transition_width = 200 / nyquist_rate
        self.stop_band_attenuation_dB = stop_band_attentuation_dB
        self.low_transition_freq = low_transition_freq
        self.high_transition_freq = high_transition_freq
        
        self.filter_coefs = []
        
        self.calculate_filter_order()
        self.generate_filter_coefs()
    
    
    def calculate_filter_order(self):
        """
        calculate_filter_order Calculates filter order based on transition width and attenuation
        """
        # Calculates filter order and beta value based on transition width and attenuation
        self.FIR_order, self.beta = kaiserord(self.stop_band_attenuation_dB, self.transition_width)
        
        # Scaled by 3 to improve accuracy, this will be used as the number of coefficients to generate
        self.FIR_order *= 3
        
        # Some limitations around filters with an even number of coefficients, so make it odd to avoid this altogether
        if self.FIR_order % 2 == 0:
            self.FIR_order += 1
    
    
    def generate_filter_coefs(self):
        """
        generate_filter_coefs Calls filter generation function based on filter type
        """
        if self.filter_type == "low":
            self.generate_lowfreq_filter()
        elif self.filter_type == "mid":
            self.generate_midfreq_filter()
        elif self.filter_type == "high":
            self.generate_highfreq_filter()
        elif self.filter_type == 'none':
            pass
    
    
    def generate_lowfreq_filter(self):
        """
        generate_lowfreq_filter Generates filter coefficients for a lowpass filter with given parameters
        """
        self.filter_coefs = firwin(self.num_coefs, self.low_transition_freq, pass_zero='lowpass', fs=self.nyquist_rate*2)

    
    def generate_midfreq_filter(self):
        """
        generate_midfreq_filter Generates filter coefficients for a bandstop filter with given parameters
        """
        self.filter_coefs = firwin(self.num_coefs, [self.low_transition_freq / self.nyquist_rate, self.high_transition_freq / self.nyquist_rate], pass_zero='bandpass')

    
    def generate_highfreq_filter(self):
        """
        generate_highfreq_filter Generates filter coefficients for a highpass filter with given parameters
        """
        self.filter_coefs = firwin(self.num_coefs, self.high_transition_freq, pass_zero='highpass', fs=self.nyquist_rate*2)

    
    def plot_filter(self):
        """
        plot_filter Plots the frequency response of the filter
        """        
        figure()

        # Computes the frequecy response of the filter
        response_frequencies, complex_response = freqz(self.filter_coefs, worN=8000)
        
        plot((response_frequencies / np.pi) * self.nyquist_rate, np.absolute(complex_response), linewidth=2)
        
        title("Filter Curves")
        xlabel("Frequency (Hz)")
        ylabel("Filter Gain")
        
        grid(True)
        

print("Done")

Done


#### Filter Test Code
Generates low, mid, and high frequency filters, then combined and scaled coefficients into a single filter

In [None]:
lowfreq_filter = Filter('low')
midfreq_filter = Filter('mid')
highfreq_filter = Filter('high')

# lowfreq_filter.plot_filter()
# midfreq_filter.plot_filter()
# highfreq_filter.plot_filter()

new_filter = Filter()
new_filter.filter_coefs = lowfreq_filter.filter_coefs * 0.6 + midfreq_filter.filter_coefs * 0.3 + highfreq_filter.filter_coefs * 0.4
new_filter.plot_filter()

### Audio Signal Class Code
Describes audio_signal-type objects, which contains various methods for reading/writing audio signal to/from wav files, and methods for equalizing the signal either in software or hardware.

In [44]:
import wave
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import lfilter

from pynq import Overlay
from pynq import allocate

class Audio_Signal:
    def __init__(self, input_file_path, output_file_path):
        """
        __init__ Initializes input and output filepaths based on passed parameters

        Args:
            input_file_path (string): Input wav file
            output_file_path (string): Output wav file
        """
        self.input_file_path = input_file_path
        self.output_file_path = output_file_path
        
        self.sw_output_file_path = "SW_" + self.output_file_path
        self.hw_output_file_path = "HW_" + self.output_file_path

    
    def load_audio(self):
        """
        load_audio Reads audio data from input wav file and extracts signal parameters
        """
        audio_signal = wave.open(self.input_file_path, 'r')
        
        # Pull signal parameters from header
        self.frame_count = audio_signal.getnframes()
        self.channel_count = audio_signal.getnchannels()
        self.sample_width = audio_signal.getsampwidth()
        self.sample_rate = audio_signal.getframerate()
        self.frame_rate = audio_signal.getframerate() * 2

        # Load audio frames from wav file and convert to numpy array
        self.audio_frames = audio_signal.readframes(self.frame_count)
        self.audio_frames = np.frombuffer(self.audio_frames, dtype = np.int16)
        
        # Determine length of audio signal (Should be 2 * frame_cound)
        self.audio_length = len(self.audio_frames)
    
    
    def plot_audio_data(self):
        """
        plot_audio_data Plots amplitude vs. time of audio signal
        """
        # Generate time signal based on audio data and frame rate
        self.time_signal = np.linspace(0, self.audio_length / self.frame_rate, num = self.audio_length)

        plt.figure()
        plt.plot(self.time_signal, self.audio_frames)
        plt.xlabel("Time (s)")
        plt.ylabel("Signal Amplitude")

        plt.show()
        
    
    ###   SOFTWARE EQUALIZATION FUNCTIONS   ###
    def SW_equalize(self, lowfreq_gain, midfreq_gain, highfreq_gain):
        """
        SW_equalize Equalizes the audio signal, scaling by respective gain for low, mid, and high frequencies

        Args:
            lowfreq_gain (float): Specifies gain of low frequency components of audio signal
                    Must be positive for proper equalization
            midfreq_gain (float): Specifies gain of mid frequency components of audio signal
                    Must be positive for proper equalization
            highfreq_gain (float): Specifies gain of high frequency components of audio signal
                    Must be positive for proper equalization
        """
        self.num_coefs = 99

        # Generate, combine, and scale filter coefficients for software function
        self.make_sw_coefs(lowfreq_gain, midfreq_gain, highfreq_gain)

        # Filter audio data using generated filter coefficients
        self.sw_equalized_signal = lfilter(self.coefs, 1.0, self.audio_frames)
        
        # Do we want to call this now or have the use manually call it?
        # Is this the best way to transfer the audio signal between the two parts of the project?
        # Or should we just transfer using numpy arrays?
        self.write_to_wav_file(self.sw_equalized_signal, self.sw_output_file_path)

    
    def make_sw_coefs(self, lowfreq_gain, midfreq_gain, highfreq_gain):
        """
        make_sw_coefs Generates and scales filter coefficients for software equalizer

        Args:
            lowfreq_gain (float): Specifies gain of low frequency components of audio signal
                    Must be positive for proper equalization
            midfreq_gain (float): Specifies gain of mid frequency components of audio signal
                    Must be positive for proper equalization
            highfreq_gain (float): Specifies gain of high frequency components of audio signal
                    Must be positive for proper equalization
        """
        self.lowfreq_filter = Filter('low')
        self.midfreq_filter = Filter('mid')
        self.highfreq_filter = Filter('high')
        
        # Combine filter coefficients into single filter based on gain for each band
        self.coefs = Filter()
        self.coefs.filter_coefs = (self.lowfreq_filter.filter_coefs * lowfreq_gain
                                   + self.midfreq_filter.filter_coefs * midfreq_gain
                                   + self.highfreq_filter.filter_coefs * highfreq_gain)
        
        print(self.coefs)

    
    ###   HARDWARE EQUALIZATION FUNCTIONS   ###
    def equalize(self, lowfreq_gain, midfreq_gain, highfreq_gain):
        """
        equalize Equalizes the audio signal using hardware overlay, scaling by respective gain for low, mid, and high frequencies

        Args:
            lowfreq_gain (float): Specifies gain of low frequency components of audio signal
                    Must be positive for proper equalization
            midfreq_gain (float): Specifies gain of mid frequency components of audio signal
                    Must be positive for proper equalization
            highfreq_gain (float): Specifies gain of high frequency components of audio signal
                    Must be positive for proper equalization
        """
        self.num_codes = 2
        self.num_bands = 3
        self.num_coefs = 99
        
        # Buffer = START + (gain + coefs for each band of equalizer) + STOP + signal values
        self.buffer_length = self.num_codes + self.num_bands * (1 + self.num_coefs) + self.audio_length
        self.buffer_header_length = self.num_codes + self.num_bands * (1 + self.num_coefs)
        
        # Load overlay
        overlay = Overlay('/home/xilinx/pynq/overlays/equalizer_with_coefs/equalizer_with_coefs.bit')
        #overlay?

        # Set auto-reset bit to 0
        hls_ip = overlay.axi_dma_0
        CONTROL_REGISTER = 0x0
        hls_ip.write(CONTROL_REGISTER, 0x01)
        overlay.axi_dma_0.register_map
        
        # Label DMA for data transfers and initialize input/output buffers
        dma = overlay.axi_dma_0     
        self.input_buffer = allocate(shape=(self.buffer_length,), dtype=np.int32)
        self.output_buffer = allocate(shape=(self.buffer_length,), dtype=np.int32)
        
        # Generate filter coefficients for low, mid, and high frequency filters
        self.make_coefs(lowfreq_gain, midfreq_gain, highfreq_gain)

        # Format start/stop coefs, filter gains and coefficients, and audio signal for sending to hardware overlay
        self.format_input()

        # Send and receive data from hardware overlay
        self.run_filter(dma)

        print(self.output_buffer)

        # Write equalized signal to output wav file
        self.hw_equalized_signal = self.output_buffer
        self.write_to_wav_file(self.hw_equalized_signal, self.hw_output_file_path)
    
    
    def make_coefs(self):
        """
        make_coefs Generates 1-D array of filter coefficients for hardware overlay equalizer
                Contains coefficients for low, mid, and high frequency filters, in that order
        """
        self.lowfreq_filter = Filter('low')
        self.midfreq_filter = Filter('mid')
        self.highfreq_filter = Filter('high')
        

        self.coefs = [0] * (self.num_bands * self.num_coefs)
        
        for i in range(self.num_coefs):
            self.coefs[i] = self.lowfreq_filter.filter_coefs[i]
        
        for i in range(self.num_coefs):
            self.coefs[i + self.num_coefs] = self.midfreq_filter.filter_coefs[i]
        
        for i in range(self.num_coefs):
            self.coefs[i + 2 * self.num_coefs] = self.highfreq_filter.filter_coefs[i]

        print(self.coefs)
        
        # Potential replacement for above for loops:
        for i in range(self.num_coefs):
            self.coefs[i] = self.lowfreq_filter.filter_coefs[i]
            self.coefs[i + self.num_coefs] = self.midfreq_filter.filter_coefs[i]
            self.coefs[i + 2 * self.num_coefs] = self.highfreq_filter.filter_coefs[i]
            
        print(self.coefs)
    
    
    def format_input(self, lowfreq_gain, midfreq_gain, highfreq_gain):
        """
        format_input Formats input_buffer for sending to hardware overlay
                Format is as follows:
                    1. Start code (0xBEEF)
                    2. Gain for low frequency components of audio signal
                    3. Low frequency filter coefficients
                    4. Gain for mid frequency components of audio signal
                    5. Mid frequency filter coefficients
                    6. Gain for high frequency components of audio signal
                    7. High frequency filter coefficients
                    8. Stop code (0xABBA)
                    9. Audio signal

        Args:
            lowfreq_gain (float): Specifies gain of low frequency components of audio signal
                    Must be positive for proper equalization
            midfreq_gain (float): Specifies gain of mid frequency components of audio signal
                    Must be positive for proper equalization
            highfreq_gain (float): Specifies gain of high frequency components of audio signal
                    Must be positive for proper equalization
        """
        offset = 0

        self.input_buffer[offset] = 0xBEEF
        offset += 1
        
        self.input_buffer[offset] = lowfreq_gain
        offset += 1

        for coef in range(self.num_coefs):
            self.input_buffer[coef + offset] = self.coefs[coef] * 100
        offset += self.num_coefs

        self.input_buffer[offset] = midfreq_gain
        offset += 1

        for coef in range(self.num_coefs):
            self.input_buffer[coef + offset] = self.coefs[coef + self.num_coefs] * 100
        offset += self.num_coefs
        
        self.input_buffer[offset] = highfreq_gain

        for coef in range(self.num_coefs):
            self.input_buffer[coef + offset] = self.coefs[coef + 2 * self.num_coefs] * 100
        offset += self.num_coefs

        self.input_buffer[offset] = 0xABBA
        offset += 1

        for audio_value in range(self.audio_length):
            self.input_buffer[coef + offset] = self.audio_frames[audio_value] * 100
        offset += self.audio_length
        
        print(offset)
        print(self.buffer_length)
        print(self.audio_length)
        
        for i in range(self.buffer_header_length):
            print(self.input_buffer[i])
            
        print(self.input_buffer)
    
    
    def run_filter(self, dma):
        """
        run_filter Sends input buffer and receives output buffer from hardware overlay

        Args:
            dma (overlay ip): AXI Stream DMA IP
        """
        dma.sendchannel.transfer(self.input_buffer)
        dma.recvchannel.transfer(self.output_buffer)
        dma.sendchannel.wait()
        dma.recvchannel.wait()
        
    
    def write_to_wav_file(self, frames, output_path):
        """
        write_to_wav_file Writes equalized audio signal to output file path

        Args:
            frames (numpy array): Audio signal frames
        """
        output_frames = np.array(frames).astype(np.int16)
        
        with wave.open(output_path, 'w') as output_file:
            output_file.setnchannels(self.channel_count)
            output_file.setsampwidth(self.sample_width)
            output_file.setframerate(self.sample_rate)
            output_file.writeframes(output_frames)


print("Done")

Done


#### Audio Signal Test Code

In [45]:
signal_input_file_path = './electric_guitar_sample.wav'
signal_output_file_path = './outputs/equalized_output.wav'

sample_audio = Audio_Signal(signal_input_file_path, signal_output_file_path)

sample_audio.load_audio()

sample_audio.SW_equalize(0, 1, 0)

#sample_audio.equalize(0, 1, 0)

[4.867392539550531e-19, 0.00021515628455488127, 0.00031269593836189187, 0.000237062141521199, -3.6930917173289425e-05, -0.0004869074113674302, -0.0010315738916985662, -0.0015362031695610888, -0.001842784937254516, -0.0018231293872149024, -0.0014405521785018785, -0.0007957581666125983, -0.00013108865356210162, 0.00022239748699852724, -4.992090637557277e-05, -0.0011177119836033975, -0.002900376582308173, -0.005033779836313536, -0.006940304106176474, -0.007999086601337411, -0.00777263842696988, -0.006214285872901895, -0.0037724229139706376, -0.0013301543492713625, 3.052395256216398e-05, -0.0006132742854032516, -0.0036541547568944292, -0.008704101641169354, -0.014578877557570997, -0.019566039692378794, -0.021932043434739715, -0.02053134461810611, -0.015321675508936096, -0.0075902286776838475, 0.00023660762525976796, 0.005206286170511466, 0.004742197311552983, -0.0024134217356676087, -0.015512150096558177, -0.031622520756985545, -0.046091232240260946, -0.05358886621167503, -0.04950351925058

### Hardware Function Testing
Used for testing hardware overlay separate from Filter and Audio_Signal class integration
Slightly out-of-date, and is not needed at the moment for current implementation

In [None]:
from pynq import Overlay
import numpy as np
from pynq import allocate

overlay = Overlay('/home/xilinx/pynq/overlays/equalizer/equalizer.bit')
overlay?

In [None]:
dma = overlay.equalizerDMA.axi_dma_0
input_buffer = allocate(shape=(200,), dtype=np.int32)
output_buffer = allocate(shape=(99,), dtype=np.int32)

In [None]:
input_buffer[0] = 0xBEEF

for i in range(99):
    input_buffer[i + 1] = i * ((-1) ** i)

input_buffer[100] = 0xABBA

for i in range(99):
    input_buffer[i + 101] = 1

print(input_buffer)

In [None]:
def run_kernel():
    print(0)
    dma.sendchannel.transfer(input_buffer)
    print(1)
    dma.recvchannel.transfer(output_buffer)
    print(2)
    dma.sendchannel.wait()
    print(3)
    dma.recvchannel.wait()
    print(4)

In [None]:
run_kernel()

In [None]:
print(output_buffer)

In [37]:
from pynq import Overlay
import numpy as np
from pynq import allocate
import time

overlay = Overlay('/home/xilinx/pynq/overlays/equalizer/equalizer.bit')
#overlay?
hls_ip = overlay.axi_dma_0

CONTROL_REGISTER = 0x0
hls_ip.write(CONTROL_REGISTER, 0x01)
overlay.axi_dma_0.register_map

dma = overlay.axi_dma_0
input_buffer = allocate(shape=(4000,), dtype=np.int32)
output_buffer = allocate(shape=(4000 - 35,), dtype=np.int32)

# input_buffer_2 = allocate(shape=(200,), dtype=np.int32)

input_buffer[0] = 0xBEEF

for i in range(33):
    input_buffer[i + 1] = i

input_buffer[34] = 0xABBA

for i in range(99):
    input_buffer[i + 35] = 1

print(input_buffer)
print(len(input_buffer))

# for i in range(200):
#     input_buffer_2[i] = 1
    
# print(input_buffer_2)

print(0)
dma.sendchannel.transfer(input_buffer)
print(1)
dma.recvchannel.transfer(output_buffer)
print(2)
dma.sendchannel.wait()
print(3)
dma.recvchannel.wait()
print(4)

print(output_buffer)

[48879     0     1 ...     0     0     0]
4000
0
1
2
3
4
[0 1 3 ... 0 0 0]
