In [331]:
import numpy as np
import librosa
from IPython.display import Audio
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go

In [480]:

def gcc_phat(sig, refsig, fs=1, interp=16):
    '''
    This function computes the offset between the signal sig and the reference signal refsig
    using the Generalized Cross Correlation - Phase Transform (GCC-PHAT) method.
    '''
    # make sure the length for the FFT is larger or equal than len(sig) + len(refsig)
    n = sig.shape[0] + refsig.shape[0]

    # Generalized Cross Correlation Phase Transform
    SIG = np.fft.rfft(sig, n=n)
    REFSIG = np.fft.rfft(refsig, n=n)
    R = SIG * np.conj(REFSIG)

    cc = np.fft.irfft(R / (np.abs(R) + 1e-10), n=interp * n)  
    

    max_shift = int(interp * n / 2)
    # print(np.argmax(np.abs(cc)))

    cc = np.concatenate((cc[-max_shift:], cc[:max_shift+1]))

    # print(np.argmax(np.abs(cc)))
    # # find max cross correlation index
    
    # shift = np.argmax(np.abs(cc)) - max_shift

    # tau = shift / float(interp * fs)
    return  cc#,tau


In [481]:
# (np.argmax(np.abs(cc)) - int(sig_len * 16)) / (16000 * 16)

In [482]:
# np.linspace(-sig_len/ fs, sig_len/ fs, 16 * sig_len * 2 +1).astype(np.float32)[np.argmax(np.abs(cc))]

In [483]:
# np.argmax(np.abs(cc))

In [None]:
# fs = 16000  # Sampling frequency (Hz)
# f = 10  # Frequency of sine wave (Hz)
# seconds = 1  # Duration of the signal (s)
# t = np.arange(0, seconds, 1/fs)

# # Generate Sine Waves
# sig1 = wav[0]#np.concatenate([np.zeros(1000),np.sin(2 * np.pi * f * t)*3, np.zeros(1000)])  # First sine wave
# delay_time = 0.02 # 1/4 second delay
# delay_samples = int(delay_time * fs)  # Convert delay to samples
# print(delay_samples / fs)
# sig2 = np.roll(sig1, delay_samples)  # Second sine wave with delay
# sig_len = sig1.shape[0]

# delay_axs = np.linspace(-sig_len/ fs, sig_len/ fs,16*sig_len * 2 +1).astype(np.float32)

# # Apply GCC-PHAT
# cc = gcc_phat(sig1, sig2, fs)

# delay_axs[np.argmax(np.abs(cc))]
# # (np.argmax(np.abs(cc )) - sig1.shape[0])  / fs
# # # Plot the Cross-Correlation
# # plt.figure(figsize=(10, 5))
# # plt.plot(tau_axis, cc)
# # plt.axvline(x=estimated_delay, color='r', linestyle='--', label=f'Estimated Delay = {estimated_delay:.4f} s')
# # plt.xlabel("Time Shift (s)")
# # plt.ylabel("Cross-Correlation")
# # plt.title("GCC-PHAT Cross-Correlation")
# # plt.legend()
# # plt.grid()
# # plt.show()

# # # Print the Estimated Delay
# # print(f"Estimated Delay: {estimated_delay:.4f} seconds")
# # print(f"Expected Delay: {delay_time} seconds")

0.02


-0.02

In [469]:
import itertools
def srp_phat(signals, mic_positions, grid, fs, interp = 16):
    """Perform SRP-PHAT localization."""
    num_mics = len(mic_positions)
    srp_map = np.zeros(grid.shape[0], dtype=np.float32)

    sig_len = signals.shape[1]

    delay_axs = np.linspace(-sig_len/ fs, sig_len/ fs, interp*sig_len * 2 +1).astype(np.float32)


    n_channels = signals.shape[0]
    # taus = np.zeros((n_channels, n_channels))
    gccs = {}
    for (i, j) in itertools.combinations(range(num_mics), 2):
        if i == j:
            continue
        cc = gcc_phat(signals[i], signals[j], fs=fs, interp=interp)
        # print(i,j,tau)
        gccs[(i,j)] = cc


    for idx,candidate in enumerate(grid):
        power = 0
        for (i, j) in itertools.combinations(range(num_mics), 2):
            if i == j:
                continue
            dist_i = np.linalg.norm(candidate - mic_positions[i])
            dist_j = np.linalg.norm(candidate - mic_positions[j])
            tau = (dist_i - dist_j) / 343.0  # Speed of sound (m/s)
            
            gcc = gccs[(i,j)]
            
            closest_idx = np.argmin(np.abs(delay_axs - tau))
            power += gcc[closest_idx]

        srp_map[idx] = power

    return srp_map

In [521]:
d = 0.2
fs = 16000

mic_pos = np.array([
          [2.6 - d / 2, 3, 1.5], [2.6 + d / 2, 3, 1.5], [2.6, 3-d/2, 1.5], [2.6 , 3+d/2, 1.5]

      ])
room_dim = [5.2,6.2,3.5]


In [551]:
from scipy.signal import fftconvolve
import rir_generator as rir



def rnd_speaker_pos():
  x = np.random.uniform(low=1, high=4, size=(1,))[0]
  y = np.random.uniform(low=1, high=5, size=(1,))[0]
  speaker_pos =  [x, y, 1.5]
  return speaker_pos

def gen_rir(t60,pos):

  h = rir.generate(
      c=340,                  # Sound velocity (m/s)
      fs=fs,                  # Sample frequency (samples/s)
      r=mic_pos,

      s=pos,          # Source position [x y z] (m)
      L=room_dim,            # Room dimensions [x y z] (m)
      reverberation_time=t60, # Reverberation time (s)
      nsample=int(t60 * fs),           # Number of output samples
  )
      # [1.617, 2.45, 1.7]
  return h

def add_white_noise(signal, snr_db=10):
    if len(signal.shape) == 1 :
      signal = signal.reshape(1,-1)
    signal_power = np.mean(signal**2,axis=1, keepdims=True)
    noise_power = signal_power / (10**(snr_db / 10))
    noise = np.sqrt(noise_power) * np.random.normal(0, 1, signal.shape) # spectral and spatial white noise - uncorollated sensors
    noisy_signal = signal + noise
    return noisy_signal, noise


def convolve_rir(signal, rirs):
     rirs = rirs.T
     return np.stack([np.convolve(signal, rir, mode='full') for rir in rirs])


In [554]:
wav, sr = librosa.load('12.wav', sr=None, mono=False)
speaker_pos = rnd_speaker_pos()
imp03_main = gen_rir(0.3,speaker_pos)

main_speaker_room = convolve_rir(wav, imp03_main)

main_speaker_room_noisy_white, white_noise = add_white_noise(main_speaker_room, snr_db=15)

grid = np.array([[x, y, 1.5] for x in np.linspace(0, room_dim[0], 20) for y in np.linspace(0, room_dim[1], 20)])

srp = srp_phat(main_speaker_room_noisy_white, mic_pos, grid, sr, interp=1)
best_pos = srp.argmax()


srp = srp_phat(main_speaker_room_noisy_white, mic_pos, grid, sr, interp=1)
best_pos = srp.argmax()
print(f"Estimated Source Position: {grid[best_pos]}")
print(f"True Source Position: {speaker_pos}")


srp_map = srp
srp_map = srp_map.reshape(20, 20)
fig = px.imshow(srp_map.T, x=np.linspace(0, room_dim[0], 20), y=np.linspace(0, room_dim[1], 20))
#  add the microphones
fig.add_trace(go.Scatter(x=mic_pos[:, 0], y=mic_pos[:, 1], mode='markers', marker=dict(size=10, color='red')))
# add the estimated source position
fig.add_trace(go.Scatter(x=[grid[best_pos][0]], y=[grid[best_pos][1]], mode='markers', marker=dict(size=10, color='blue')))
fig.add_trace(go.Scatter(x=[speaker_pos[0]], y=[speaker_pos[1]], mode='markers', marker=dict(size=10, color='green')))
fig.update_layout(title='SRP-PHAT Map',
                  xaxis_title='X (m)',
                  yaxis_title='Y (m)')
fig.show()


Estimated Source Position: [2.18947368 4.89473684 1.5       ]
True Source Position: [2.2259074885666728, 4.4484389405386615, 1.5]


In [241]:
px.imshow(srp[1].reshape(20,20))

In [None]:
sin.shape

In [None]:
Audio(sin, rate=4000)

In [None]:
a= np.fft.rfft(sin)
# a = np.abs(a)
b = np.fft.irfft(a)


In [None]:
a

In [None]:
import plotly.express as px
import plotly.graph_objects as go


In [None]:
fig = go.Figure()
freqs = np.fft.rfftfreq(len(sin), 1/4000)

fig.add_trace(go.Scatter(x=freqs, y=np.abs(a)))
fig.add_trace(go.Scatter(x=freqs, y=np.angle(a)))

fig.show()

In [None]:
px.line( y=b)


In [None]:
# Compute the GCC-PHAT between each 2 pairs of channels
n_channels = wav.shape[0]
# cc = np.zeros((n_channels, n_channels, wav.shape[1]))
taus = np.zeros((n_channels, n_channels))
for i in range(n_channels):
    for j in range(n_channels):
        if i == j:
            continue
        tau, cc = gcc_phat(wav[i], wav[j], fs=sr)
        print(i,j,tau)
        taus[i, j] = tau


In [None]:
gcc_phat(wav[0], wav[3], fs=sr)

In [None]:
mic_pos.shape

In [189]:
import pyroomacoustics as pra

srp_phat = pra.doa.srp.SRP(mic_pos.T, fs=sr, nfft=1024, num_src=1, c=343)


In [190]:
nfft = 1024
X = np.array(
    [
        pra.transform.stft.analysis(signal, nfft, nfft // 2).T
        for signal in wav
    ]
)
srp_phat.locate_sources(X )

In [None]:
srp_phat.grid

In [None]:
import matplotlib.pyplot as plt
azimuths = np.degrees(srp_phat.grid.azimuth)  # Convert to degrees
elevations = np.degrees(srp_phat.grid.colatitude)  # Convert colatitude to elevation
srp_response = srp_phat.grid.values  # Steered response power

# Create Azimuth-Elevation Heatmap
plt.figure(figsize=(10, 6))
plt.scatter(azimuths, elevations, c=srp_response, cmap='jet', marker='s')
plt.colorbar(label="SRP-PHAT Power")
plt.xlabel("Azimuth (°)")
plt.ylabel("Elevation (°)")
plt.title("SRP-PHAT Localization Heatmap")
plt.grid()
plt.show()

In [None]:
srp_phat.grid.plot()

In [None]:
srp_phat.polar_plt_dirac()