# AutoEQ on demand - for speaker single channel - with PyTorch
### with TCPi Channel and ESP32 as Server

In [None]:
%pylab inline

In [None]:
from pathfinder import Pathfinder

# abs_paths = ['']
# Pathfinder.relative_paths_from_abs(abs_paths)

relative_paths = [['..', '..', '..', '..', 'SigmaDSP', 'bitbucket', 'github', 'codes'],
                  ['..', '..', '..', '..', 'Signal Generators', 'bitbucket', 'github', 'codes'],
                  ['..', '..', '..', '..', 'Utilities', 'bitbucket', 'github', 'codes'], 
                  ['..', '..', '..', '..', 'TCPi', 'bitbucket', 'github', 'codes']] 

Pathfinder.append_relative_paths(relative_paths)
# =====================================================

import pandas as pd

#https://thispointer.com/python-pandas-how-to-display-full-dataframe-i-e-print-all-rows-columns-without-truncation/
pd.set_option('display.max_rows', None)
# pd.set_option('display.max_columns', None)
# pd.set_option('display.width', None)
# pd.set_option('display.max_colwidth', -1)
# =====================================================

## TCPi

import time 

from tcpi.protocols.TCPIP1701 import class_finder
# =====================================================

### Client

from tcpi.bus.tcpi_client import I2C as TcpI2C_client

tcpi_client = TcpI2C_client(class_finder)
server_ip = '192.168.203.36'
tcpi_client.connect(server_ip = server_ip, server_port = 8086)
time.sleep(1)
# =====================================================

## DSP processor

from sigma.sigma_dsp.adau import ADAU1401

# dsp = ADAU1401(bus = bus)
dsp = ADAU1401(bus = tcpi_client)
# =====================================================

## SigmaStudio project file

In [None]:
import os

SigmaStudio_project_file_url = os.sep.join(['..', 'SigmaStudio projects', 'projects', 'demo', 'demo.dspproj'])
SigmaStudio_project_file_url

![](https://github.com/Wei1234c/DRC/blob/master/SigmaStudio%20projects/projects/demo/demo.png?raw=true)

## Factory

In [None]:
import os
from sigma.factory import Factory 

project_xml_file_url = os.sep.join(['..', 'SigmaStudio projects', 'projects', 'demo', 'demo.xml'])
class_files_root_url = os.sep.join(['..', '..', '..', '..', 'SigmaDSP', 'bitbucket', 'github', 'codes', 'sigma', 'sigma_studio', 'toolbox', 'cells']) 

factory = Factory(project_xml_file_url = project_xml_file_url,
                  class_files_root_url = class_files_root_url,
                  dsp = dsp
                 )

## IC

In [None]:
# showing cells, algorithms, parameters of this project

ic = factory.get_ic()
ic.df

## Cells 

In [None]:
cells = factory.get_cells(ic)

### Get Cell objects ready for use.

In [None]:
# # un-comment this to generate a script to embody Cell objects.

# for o in factory.get_cells_manifest():
#     print(o) 

In [None]:
# assign variable names to each Cell for easy access.

delay0 = cells['Delay0']  # Fractional Delay( 1 )
delay1 = cells['Delay1']  # Fractional Delay( 1 )
fir0 = cells['FIR0']  # FIR( 1 )
fir1 = cells['FIR1']  # FIR( 1 )
gain0 = cells['Gain0']  # Gain (no slew)( 1 )
gain1 = cells['Gain1']  # Gain (no slew)( 1 )
invert0 = cells['Invert0']  # Invert( 1 )
invert1 = cells['Invert1']  # Invert( 1 )
merger0 = cells['Merger0']  # Signal MixerC
merger1 = cells['Merger1']  # Signal MixerC
mute00 = cells['Mute00']  # No Slew (Standard)( 1 )
mute01 = cells['Mute01']  # No Slew (Standard)( 1 )
mute10 = cells['Mute10']  # No Slew (Standard)( 1 )
mute11 = cells['Mute11']  # No Slew (Standard)( 1 )
param_eq0 = cells['Param EQ0']  # PEQ1Chan - Double Precision( 1 )
param_eq1 = cells['Param EQ1']  # PEQ1Chan - Double Precision( 1 )
pink_flt1 = cells['Pink Flt1']  # Pink Noise Filter( 1 )
source_switch = cells['Source_Switch']  # Stereo SW Slew( 3 )
sw_noise = cells['SW_noise']  # Mono SW Slew( 2 )
tone = cells['Tone']  # Tone Synthesis (lookup/sine)( 1 )
white_noise = cells['White_Noise']  # White Noise( 1 )

In [None]:
channels = {'left' : {'idx_channel': 0,
                      'muter': mute00,
                      'gain' : gain0,
                      'delay': delay0,
                      'peq'  : param_eq0,
                      'fir'  : fir0},
            'right': {'idx_channel': 1,
                      'muter': mute11,
                      'gain' : gain1,
                      'delay': delay1,
                      'peq'  : param_eq1,
                      'fir'  : fir1}}

def mute_all(value = True):
    for channel in channels.values():
        channel['muter'].mute(value)
        
def source_select_white_noise():
    source_switch.switch(1)
    invert1.enable(False)
    sw_noise.switch(1)
    
def source_select_white_noise_inverted():
    source_switch.switch(1)
    invert1.enable(True)
    sw_noise.switch(1)
    
def source_select_normal():
    source_switch.switch(2)
    mute_all(False)

def measure_channel(channel_name):
    mute_all(True)
    source_select_white_noise()
    channels[channel_name]['muter'].mute(False)
    
def set_peq_coeffs(channel_name, coeffs):   
    muter = channels[channel_name]['muter'] 
    muter.mute(True)
    channels[channel_name]['peq'].set_coefficients_values(coeffs)
    muter.mute(False)    

def reset_peq_coeffs(channel_name):   
    muter = channels[channel_name]['muter'] 
    muter.mute(True)
    channels[channel_name]['peq'].reset_coefficients()
    muter.mute(False)
    
def reset_delays():
    delay0.set_delayed_percentage(0)
    delay1.set_delayed_percentage(0)
    
def reset_gains():
    gain0.set_gain(1)
    gain1.set_gain(1)

# Frequency Response Tuning

In [None]:
from pathfinder import Pathfinder

relative_paths = [['..', '..', '..', '..', '..', '..', '資料科學', 'Allen Downey', 'Think DSP', 'code'],
                  ['..', '..', '..', '..', 'AutoEq'],
                  ['..', 'codes']]

Pathfinder.append_relative_paths(relative_paths)

# import thinkdsp
# from drc.sound import Sound, Channel, InputDevice

import time
from drc.sound import InputDevice, Channel
from drc.filters.peq import PEQs
from drc.tuners.response.equalizer import ResponseEqualizer
from drc.measurements.frequency.responses import FrequencyResponse 
from drc.measurements import Sampler
import pandas as pd

In [None]:
# from pprint import pprint

# pprint(Sound.scan_devices(0))

# Microphone settings

In [None]:
from drc.measurements.frequency.calibrations.miniDSP import UMIK1


fn_calibration = 'UMIK-1 cal file 7103946.txt'
# fn = 'UMIK-1 cal file 7103946_90deg.txt'

mic = UMIK1()
mic.load(file_name = fn_calibration, n_header_lines = 2) 

## Setup Source Signal

In [None]:
# switch to white noise
source_select_white_noise()

# Utilities for sampling

In [None]:
SMOOTHING_WINDOW_SIZE = 1/12
TREBLE_SMOOTHING_WINDOW_SIZE = 2
    
def probe(window_size = SMOOTHING_WINDOW_SIZE,
          treble_window_size = TREBLE_SMOOTHING_WINDOW_SIZE,
          n_samplings = 10):     
    
    return mic.get_frequency_response(input_device_idx = 1,  
                                      window_size = window_size,
                                      treble_window_size = treble_window_size, 
                                      n_samplings = n_samplings)
def plot(fr):
    fr.plot_graph(raw_plot_kwargs= {'color': 'green', 'alpha': 0.5},
                  smoothed_plot_kwargs= {'color': 'red', 'alpha': 0.5});

# Reset Filters

In [None]:
for channel_name in channels.keys():
    reset_peq_coeffs(channel_name)  

# Prepare to measure one channel

In [None]:
channel_name = 'left'
measure_channel(channel_name)

# Get Samples

In [None]:
fr_sample, fr_measurement = probe()
plot(fr_sample);

# Calculate PEQ Coefficients 

In [None]:
# _, fr_measurement = probe()

In [None]:
import time

n_filters = 10
max_gain_dB = 12
bass_boost_gain = 0

# compensation_path = 'compensation/harman_over-ear_2018.csv'
compensation_path = 'compensation/zero.csv'
compensation = FrequencyResponse.read_from_csv(compensation_path)

measurement, peqs, n_peq_filters, peq_max_gains = \
    ResponseEqualizer.get_peq_filters(fr_measurement,
                                      compensation,
                                      max_filters = n_filters,
                                      max_gain_dB = max_gain_dB,
                                      bass_boost_gain = bass_boost_gain)
n_peq_filters, peq_max_gains

In [None]:
# set_peq_coeffs(channel_name, peqs.get_coefficient_sets_values(n_filters = n_filters)  )

In [None]:
measurement.plot_graph();

# PEQ Filters

In [None]:
pd.DataFrame(peqs.param_sets)

In [None]:
coeffs_equalized_values = peqs.get_coefficient_sets_values(n_filters = n_filters)  

# Apply Filters

In [None]:
set_peq_coeffs(channel_name, coeffs_equalized_values)

In [None]:
plot(probe()[0]);

# Enhance Qs and Gains

In [None]:
n_peq_filter = n_peq_filters[0]

In [None]:
set_peq_coeffs(channel_name, coeffs_equalized_values)

In [None]:
import torch
import torch.nn.functional as F
from torch import nn
 
layer_size = 256

class NeuralNetwork(nn.Module):

    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(1, layer_size)
        self.fc2 = nn.Linear(layer_size, layer_size)
        self.fc21 = nn.Linear(layer_size, layer_size)
        self.fc22 = nn.Linear(layer_size, layer_size)
        self.fc_gain = nn.Linear(layer_size, n_peq_filter)
        self.fc_Q = nn.Linear(layer_size, n_peq_filter)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        
        gains = F.relu(self.fc21(x))
        gains = torch.tanh(self.fc_gain(gains)).mul(max_gain_dB)
#         gains = torch.tanh(self.fc_gain(x)).mul(max_gain_dB)
        
#         Qs = F.relu(self.fc22(x))
#         Qs = torch.relu(self.fc_Q(Qs)).add(0.71)   
#         Qs = torch.relu(self.fc_Q(x)).add(0.5)    
        
#         x = F.normalize(x)
        return gains 
#         return gains, Qs

In [None]:
model = NeuralNetwork()

loss_fn = nn.MSELoss()
# loss_fn = nn.CrossEntropyLoss()
# optimizer = torch.optim.SGD(model.parameters(), lr = 1e-3)
optimizer = torch.optim.Adam(model.parameters(), lr = 5e-4)
# optimizer = torch.optim.Adam(model.parameters(), lr = 1e-3)

In [None]:
_, fr_measurement = probe()


# f_min = 200
# f_max = 6000
f_min = 20
f_max = 20000

compensation.interpolate(f_min = f_min, f_max = f_max)
# compensation.interpolate(f = fr_measurement.frequency)
compensation.center( )
target = compensation.raw

In [None]:
x = torch.ones((1, 1)) 
fs = peqs.fs 
    
def test_cycle():
    
#     gains, Qs = model(x)
    gains  = model(x)
    peqs.param_sets = []
    
#     filter_Qs =  [q.detach().numpy()  for q in Qs.flatten()]
#     filter_gains = [g.detach().numpy()  for g in gains.flatten()]

#     filter_Qs =  [e.item() for e in Qs[0]]
    filter_gains = [e.item() for e in gains[0]]

#     filter_Qs =  Qs.detach().numpy()
#     filter_gains = gains.detach().numpy()
     
    
#     for fc, q, gain in zip(fs,  filter_Qs, filter_gains):
    for fc,  gain in zip(fs,   filter_gains):  
        q = 0.71
#         print(f'fc: {fc:.0f}, Q: {q:.2f}, gain: {gain:.2f}' ) 
        peqs.add_peq(freq_Hz = fc, Q = q, gain_dB= gain)
    
    # apply filter coefficients
    set_peq_coeffs(channel_name, peqs.get_coefficient_sets_values(n_filters = n_filters))
    time.sleep(1)
    
    # probe and plot
    _, fr_measurement = probe(window_size = 1/6, treble_window_size = 2)
    fr_measurement.interpolate(f_min = f_min, f_max = f_max)
#     fr_measurement.center( )
    fr_measurement.target = target
    fr_measurement.plot()
     
    # train
#     print(torch.tensor(fr_measurement.raw.reshape((-1, 0))).shape, torch.tensor(target.reshape((-1, 0))).shape)
     
     
    X = torch.tensor(np.expand_dims(fr_measurement.raw, axis = 0))
    Y = torch.tensor(np.expand_dims(target, axis = 0)) 
    loss = loss_fn(X, Y)
    
#     gains.backward(gradient = loss  , retain_graph = True)

#     gains.backward(gradient = loss * gains / torch.sum(gains) , retain_graph = True)
#     Qs.backward(gradient = loss * Qs / torch.sum(Qs)  , retain_graph = True)

#     gains.backward(gradient = -loss * gains / torch.sum(gains) / n_peq_filter, retain_graph = True)
    gains.backward(gradient = loss * gains / torch.sum(gains) , retain_graph = True)
#     Qs.backward(gradient = loss * Qs / torch.sum(Qs)  / n_peq_filter, retain_graph = True)
    
    optimizer.step()

    return gains, None, loss
#     return gains, Qs, loss

In [None]:
for i in range(50):
    gains, Qs, loss = test_cycle();
    print(f'run {i} loss: {loss:.2f}'); 

In [None]:
_, fr_measurement = probe()
fr_measurement.plot()

# Listen

In [None]:
# source_select_normal()

# Close TCP channels

In [None]:
# tcpi_client.stop()