# Spectrum analyser
Real-time spectral analysis tool.  This is an interactive tool that can be run directly in Python or IPython, it does not function in the Noteable service.  When using on your own computer, please make sure that you install PyAudio to interface to your sound card.

In [None]:
import pyaudio
import numpy as np
import matplotlib.pyplot as plt
import scipy.linalg as spl
import os
import struct
import time
from tkinter import TclError

### Parameters to control processing

Parameter | Meaning
--------- | -------
<code>method</code> |One of 'Periodogram', 'Correlogram' or 'MVSE'
<code>windowType</code> |Window type can be one of 'Rectangular', 'Hanning', 'Hamming' or 'Blackman' for periodograms, or 'Triangular', 'Bartlett', 'Parzen' or 'Bohman' for correlograms
<code>samplingRate</code> |Input sampling rate
<code>framelength</code> |Number of samples per frame of data (controls resolution)
<code>overlap</code> |Overlap to use
<code>numberOfBlocks</code> |Length of history to retain
<code>acfMaxDelay</code> |Maximum delay to use in correlation based techniques
<code>MVSE_p</code> |Size of p for the MVSE method
<code>yLimits</code> |y-axis limits for the spectral plot

In [None]:
method = 'Periodogram'

windowType = 'Rectangular'

samplingRate = 16000
frameLength = 2048
overlap = 0.5*frameLength
numberOfBlocks = 20
acfMaxDelay = 1023
MVSE_p = 255
yLimits = [-100, 20]

### Function Definitions
The standard autocorrelation functions shipped with Python are not optimised to deal with long sequences where the maximum autocorrelation lag is much shorter than the sequence length.  Here we implement a method that is detailed in the lecture notes that uses FFTs to compute the limited lag autocorrelation in a very efficient manner.

The routines store a limited history of previous calculations to produce an output that can track a non-stationary input.

In [None]:
def Autocorrelation(x, M):
    """
        Calculate autocorrelation of x with maximum lag M.
        
        INPUT:
            x - vector to be correlated
            M - maximum correlation lag
        
        RETURN:
            acf - autocorrelation of x with lag M
        
    """
    global ACFstep3result
    global ACFstep6result
    global ACFs3idx
    global ACFs6idx
    global ACFlastTransform
    
    # As we are analysing a continuous stream of data from the input,
    # we only need to process the latest block, and add its result in
    # to the set of previously stored results.

    # Step 1 - compute the FFT of x_i(n)

    X = np.fft.fft(x.T, 2*M+2)

    # Step 2 - compute X_i(k)X^*_i(k) and store

    ACFstep3result[:, ACFs3idx-1] = np.multiply(np.conj(X), X).real

    # Step 3 - prepare for the next run by moving the index to the next entry
    # Use numpy.mod function to return element-wise remainder of division
    
    ACFs3idx = np.mod(ACFs3idx, ACFstep3result.shape[1]) + 1

    # Step 4 - add in the product of the previous and current
    # transforms, with the phase shift

    phase_shift = np.power((-1), np.arange(0, 2*M+2))
    ACFstep6result[:, ACFs6idx-1] = np.multiply(np.multiply(phase_shift, 
                                                            np.conj(ACFlastTransform)), X).real
    ACFs6idx = np.mod(ACFs6idx, ACFstep6result.shape[1]) + 1

    # Step 5 - Store the most recent transform for the next function call
    ACFlastTransform = X

    # Step 6 - Compute the combined result from the partial results in steps 3 and 6

    result = np.sum(ACFstep3result, 1) + np.sum(ACFstep6result, 1)

    # Step 7 - inverse FFT

    time_domain = ((np.fft.ifft(result, 2*M+2).T).real)

    # Step 9 = select the first M+1 values, and normalise by the data length

    acf = (time_domain[0:M+1]*2/ACFstep3result.size)
    acf = acf.reshape((1,len(acf)))

    # Step 10 - form the final acf
    # Use numpy.concatenate to join a sequence of arrays along axis=0.
    acf = np.concatenate(((np.fliplr(acf))[0], acf[0, 1:]))

    return acf

In [None]:
def calculate_spectrum(signal):
    """
       Calculate the spectrum of signal with different method
    """
    if method == 'Periodogram':
            new_recordedData = np.concatenate((recordedData[len(signal):], signal))
            #  Bartlett and Welch Periodograms
            estimate = np.zeros(frameLength)
            for blk in range(1, numberOfBlocks+1):
                # Identify the starting point of each block of data
                offset = recordSize * (blk-1)
                # Combine its periodogram with the previous results
                index = (np.arange(0, frameLength)+offset).astype(int) 
                # Use numpy.power function for first array elements raised 
                # to powers 2, element-wise.
                estimate = estimate + np.power(abs(np.fft.fft(np.multiply(window, 
                                               new_recordedData[index]))), 2)
                
            # Select the first half, and convert to dBs
            spectralEstimate = (10*np.log10(estimate[0:int(frameLength/2)]) -
                                10*np.log10(numberOfBlocks*frameLength))
          
            return spectralEstimate, new_recordedData
            
    elif method ==  'Correlogram':
        # Blackman and Tukey
        # First compute the autocorrelation
        rxx = Autocorrelation(signal, acfMaxDelay)
        # Apply Wiener Khintchine, with a suitable window
        spectralEstimate = 10*np.log10(abs(np.fft.fft(np.multiply(window, rxx), frameLength))).T
        # Select only the first half for display
        spectralEstimate = spectralEstimate[0:int(frameLength/2)]
        return spectralEstimate
        
    elif method == 'MVSE':
        # Minimum variance spectral estimation
        # It is more efficient to compute ACFs for longer blocks,
        # so use acfMaxDelay, and not MVSE_p which is generally
        # smaller
        rxx = Autocorrelation(signal, acfMaxDelay)
        # Select only the p values starting with delay 0
        R = spl.toeplitz(rxx[acfMaxDelay:acfMaxDelay+MVSE_p+1])
        # Perform an Eigenvalue decomposition
        [d,v] = spl.eigh(R)
        # Set up a diagonal matrix from the eigenvalues
        U = np.divide(np.ones(MVSE_p+1), (abs(d)+ np.finfo(float).eps))
        # Transform the MVSE_p eigenvectors
        V = abs(np.fft.fft(v.T, frameLength))**2
        # Finally form the spectral estimation
        spectralEstimate = 10*np.log10(MVSE_p) - 10*np.log10(np.dot(V.T, U))
        # Select only the first half for display
        spectralEstimate = spectralEstimate[0:int(frameLength/2)]
        return spectralEstimate
    else:
        raise Exception('Spectral Estimation: Unknown method: %s' %method)
        return spectralEstimate

   


In [None]:
def setupAutocorrelation(RECORDSIZE):
    """
       Initialise some variables for autocorrelation
    """
    
    global ACFstep3result
    global ACFstep6result
    global ACFs3idx
    global ACFs6idx
    global ACFlastTransform
    
    dataLength = frameLength + (numberOfBlocks-1) * RECORDSIZE
    recordSize = acfMaxDelay + 1
    dataLength = np.floor(dataLength/recordSize) * recordSize
    ACFstep3result = np.zeros((int(recordSize*2), int(dataLength/recordSize)))
    ACFstep6result = np.zeros((int(recordSize*2), int(dataLength/recordSize-1)))
    ACFs3idx = 1
    ACFs6idx = 1
    ACFlastTransform = np.zeros((1, recordSize*2))
    
    return recordSize

In [None]:
def selectInputSource(p):
    """
        Return the index of the selected input
    """

    # For more detailed information to the user, find the names of the hostAPIs
    hostAPIs = {}
    for i in range(p.get_host_api_count()):
        hostAPI_info=p.get_host_api_info_by_index(i)
        hostAPIs[hostAPI_info.get('index')] = hostAPI_info.get('name')
    
    print(f"\nChoose your input device from the list below:\n")

    # List possible suitable input devices
    devices = []
    for i in range(p.get_device_count()):
        device_info=p.get_device_info_by_index(i)
        if (device_info.get('maxInputChannels')>0):
            devices.append(device_info.get('index'))
            print(f"{device_info.get('index')}: ({hostAPIs[device_info.get('hostApi')]}) {device_info.get('name')}")

    # Initialise the input_device to an invalid number
    input_device=-1
    # Keep asking until we get a valid input device
    while (input_device == -1):
        try:
            input_device = int(input('\nSelect device: '))
            # Check that the number given is a valid device
            devices.index(input_device)
        except ValueError:
            print("You must enter a number from the list above")
            input_device = -1
    
    return input_device

### Main function
The code below is the main section of the spectrum analyser.  The code begins with initialising the data structures for the particular analysis technique selected, and then sets up the audio input, and graphical output.  Finally the code loops, repeatedly reading input and then updating the spectral analysis.

In [None]:
if numberOfBlocks == 1:
    recordSize = frameLength
else:
    recordSize = frameLength - overlap

if method == 'Periodogram':
    # Bartlet and Welch window specs
    if windowType == 'Rectangular':
        window = np.ones(frameLength)
    elif windowType == 'Hanning':
        window = np.hanning(frameLength)
    elif windowType == 'Hamming':
        window = np.hamming(frameLength)
    elif windowType == 'Blackman':
        window = np.blackman(frameLength)
    else:
        raise Exception('Unknown periodogram window: %s' %windowType)

    # Normalise the window before use
    window = window * np.sqrt(frameLength / np.dot(window.T, window))

    # Reserve space for the data record
    recordedData = np.zeros(int(frameLength + (numberOfBlocks-1)*recordSize))
    
elif method ==  'Correlogram':
    # Blackman and Tukey window specs
    m = np.arange(-acfMaxDelay, acfMaxDelay+1)
    
    if windowType == 'Triangular':
        window = (acfMaxDelay-abs(m)) / acfMaxDelay
    elif windowType ==  'Bartlett':
        window = (acfMaxDelay+1-abs(m)) / (acfMaxDelay+1) 
    elif windowType ==  'Parzen':
        # First construct the two halves of the window shape
        w_centre = (1 - 6*np.power((abs(m)/(acfMaxDelay+0.5)), 2) + 
                    6*np.power((abs(m)/(acfMaxDelay+0.5)), 3))
        w_outside = 2*np.power((1-abs(m)/(acfMaxDelay+0.5)), 3)
        # and then combine them together
        window = w_outside
        window[int(np.ceil(acfMaxDelay/2)) : int(np.floor(3*acfMaxDelay/2)+1)] = (
                    w_centre[int(np.ceil(acfMaxDelay/2)) : int(np.floor(3*acfMaxDelay/2)+1)])
    elif windowType ==  'Bohman':
        window = (np.multiply((1-abs(m)/acfMaxDelay), np.cos(np.pi*abs(m)/acfMaxDelay)) + 
                  np.sin(np.pi*abs(m)/acfMaxDelay)/np.pi)
    else:
        raise Exception('Unknown correlogram window: %s' %windowType)

    recordSize = setupAutocorrelation(recordSize)
    
elif method ==  'MVSE':
    windowType = ''
    recordSize = setupAutocorrelation(recordSize)
    
else:
    raise Exception('Unknown method: %s'%method)


done = False

# constants
CHUNK = int(recordSize)      # samples per frame 
FORMAT = pyaudio.paInt16     # audio format (bytes per sample)
CHANNELS = 1                 # single channel for microphone
RATE = samplingRate          # samples per second
time_span = 0.02
show_samples = time_span * RATE


# stream object to get data from microphone
p = pyaudio.PyAudio()

if 'No Default Input Device Available' == p.get_default_input_device_info():
    raise Exception('No audio input device detected')

# Select, and open the input device.  Keep trying until we have no error
while not done:
    input_device=selectInputSource(p)
    
    try:
        deviceReader = p.open(
                              format = FORMAT,
                              channels = CHANNELS,
                              rate = RATE,
                              input = True,
                              input_device_index=input_device,
                              # output=True,
                              frames_per_buffer = CHUNK
        )
        done=True
    except:
        print("\nError opening input stream.  Select another")

# use this backend to display in separate Tk window
%matplotlib tk

# create matplotlib figure and axes
fig1, ax1 = plt.subplots(1, figsize=(8, 16))

# variable for plotting， 2 is step size
# samples (waveform)
x = np.arange(0, show_samples , 2)      

# linspace generate evenly spaced numbers(CHUNK) 
# over a specified interval(from 0 to RATE-1) 
# frequencies (spectrum)
xf = np.linspace(0, RATE, CHUNK)              

# create a line object with random data
line, = ax1.plot(x, np.random.rand(int(0.5*show_samples)), '-', lw=2)

# format waveform axes
ax1.set_title('Time Series')
ax1.set_xlabel('Time(ms)')
ax1.set_ylabel('Amplitude')
ax1.set_ylim(-0.5, 0.5)
ax1.set_xlim(0, show_samples)
ax1.set_xticks(np.linspace(0, show_samples, 11))
ax1.set_xticklabels(np.arange(0, 22, 2))


fig2, ax2 = plt.subplots(1, figsize=(8, 16))

line_fft, = ax2.plot(xf, np.random.rand(CHUNK), '-', lw=2)

# format spectrum axes
ax2.set_title('%s %s Spectral Estimate' %(method,windowType))
ax2.set_xlabel('Frequency(Hz)')
ax2.set_ylabel('Magnitude(dB)')
ax2.set_ylim(yLimits)
ax2.set_xlim(0, 16000)
ax2.set_xticks(np.arange(0, 18000, 2000))
ax2.set_xticklabels(np.arange(0, 8100, 1000))
ax2.set_yticks(np.arange(-100, 20, 10))
ax2.grid(True)

print('stream started')

# for measuring frame rate
frame_count = 0
start_time = time.time()

while True:
    
    # binary data
    signal = deviceReader.read(CHUNK, exception_on_overflow=False)
    # convert data to integers, make np array
    data_int = struct.unpack(str(CHUNK)+'H', signal)
    # create np array
    data_np = np.array(data_int, dtype='i2') / 65536
    # update y data on figure
    # line.set_ydata(data_np)
    line.set_ydata(data_np[0:160])
    
    # compute FFT and update line
    if method == 'Periodogram':
        (spectralEstimate, recordedData) = calculate_spectrum(data_np)
        line_fft.set_ydata(spectralEstimate)
    else:
        line_fft.set_ydata(calculate_spectrum(data_np))
    
    # update figure canvas
    try:
        fig1.canvas.draw()
        fig1.canvas.flush_events()
        fig2.canvas.draw()
        fig2.canvas.flush_events()
        frame_count += 1
        
    except TclError:
        
        # calculate average frame rate
        frame_rate = frame_count / (time.time() - start_time)
        
        print('stream stopped')
        print('average frame rate = {:.0f} FPS'.format(frame_rate))
        break

© The University of Edinburgh: Produced by D. Laurenson, School of Engineering. Initial code conversion by Xing Zixiao.