# Translation of Matlab Code

Does:
- run GCC-PHAT on each pairing to get a phase angle
- convert phase angle to geometric angle using geometry

Does not:
- Apply differential smoothing to phase angles
- Provide an alternative algo. for GCC-PHAT (XC)
- Use a leaky filter on geometric angles


In [1]:
# --- IMPORTS ---

from pathlib import Path
import pandas as pd

# For GCC_PHAT calc
import numpy as np
import matplotlib.pyplot as plt
from scipy.fftpack import rfft, irfft, fftfreq, fft, ifft
import math

from itertools import combinations
from matplotlib import animation

import subprocess
import shlex
import glob

from IPython.display import Video

In [2]:
# --- An Apache-2.0 Licensed GCC-PHAT algorithm ---

"""
 Estimate time delay using GCC-PHAT 
 Copyright (c) 2017 Yihui Xiong

 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
"""

import numpy as np


def gcc_phat(sig, refsig, fs=1, max_tau=None, interp=16):
    '''
    This function computes the offset between the signal sig and the reference signal refsig
    using the Generalized Cross Correlation - Phase Transform (GCC-PHAT)method.
    '''
    
    # make sure the length for the FFT is larger or equal than len(sig) + len(refsig)
    n = sig.shape[0] + refsig.shape[0]

    # Generalized Cross Correlation Phase Transform
    SIG = np.fft.rfft(sig, n=n)
    REFSIG = np.fft.rfft(refsig, n=n)
    R = SIG * np.conj(REFSIG)

    cc = np.fft.irfft(R / np.abs(R), n=(interp * n))

    max_shift = int(interp * n / 2)
    if max_tau:
        max_shift = np.minimum(int(interp * fs * max_tau), max_shift)

    cc = np.concatenate((cc[-max_shift:], cc[:max_shift+1]))

    # find max cross correlation index
    shift = np.argmax(np.abs(cc)) - max_shift

    tau = shift / float(interp * fs)
    
    return tau, cc

In [3]:
def run_anim(test_0):
    
    # NOTE: a "frame" is the smallest unit of sampled time. (aka the data is a timeseries indexed by frame count).
    # For quick testing iteration, shrink this (aka use only the first fraction of data).
    N_TOTAL_FRAMES = 480000
    
    # a "chunk" is a group of frames considered at a time for correlation / filtering.
    N_FRAMES_PER_CHUNK = 480
    
    # Chunks needn't be discrete; they can overlap. If N_ADVANCE == N_F_PER_CHUNK then they are discrete.
    N_ADVANCE_PER_CHUNK = 480
    
    # Frequency is always the same for our tests.
    FREQUENCY_HZ = 48000
    
    """Load a test from a given test folder, and compute the phase angles / true angles over time. Animate."""
    
    print(f"Running for test {test_0}...")
    
    pressure_data = pd.read_csv(test_0 / "pressures.csv", index_col=0, header=0)
    # display(pressure_data)
    
    positions = pd.read_csv(test_0 / "positions.csv", index_col=0, header=0)
    # display(positions)

    def n_frames_to_ms(n_frames):
        """Convert from some amount of frames to time in milliseconds"""
        return n_frames/FREQUENCY_HZ*1000
    
    mic_pairs = list(combinations(range(positions.shape[0]), 2))
    
    all_pairwise_delays = []
    all_delays_flattened = []
    all_pairwise_angles = []
    all_angles_per_chunk = []
    
    # We process frequency timeseries in smaller chunks to get one estimated DOA angle per chunk of time.
    n_steps = 0
    for frame_idx in range(0, N_TOTAL_FRAMES - N_FRAMES_PER_CHUNK, N_ADVANCE_PER_CHUNK):
        n_steps += 1
        
        # Consider this chunk.
        this_chunk_pressure_data = pressure_data.iloc[frame_idx:frame_idx+N_FRAMES_PER_CHUNK]
        this_chunk_delays = []
        this_chunk_pairwise_delays = []
        this_chunk_pairwise_angles = []
        this_chunk_flattened_angles = []
        
        # Here's where we do the heavy lifting in computing angles for each pairing!
        for pair_idx, (mic1_idx, mic2_idx) in enumerate(mic_pairs):
            mic1_pressure_header, mic2_pressure_header = this_chunk_pressure_data.columns[mic1_idx], this_chunk_pressure_data.columns[mic2_idx]

            s1, s2 = this_chunk_pressure_data[mic1_pressure_header], this_chunk_pressure_data[mic2_pressure_header]
            
            # ----- COMPUTE PHASE ANGLE BETWEEN THIS PAIR -------
            delay, _ = gcc_phat(s1, s2, FREQUENCY_HZ)
            
            # Prep for geometric angle calc.
            mic1_pos = positions.iloc[mic1_idx]
            mic2_pos = positions.iloc[mic2_idx]
            m1x, m1y = tuple(mic1_pos)
            m2x, m2y = tuple(mic2_pos)
            
            
            # ---- PHASE ANGLE TO GEOMETRIC ANGLE CALCULATION ----
            
            # This was the MatLab implementation I based this off of. But I also
            # kind of derived the setup from scratch.
            
            # [d,a]=distang(micpos(i,:),frequency_hz; %Calculate distance and angle between mics
            # ra=(1:360)*pi/180-a; %Create angle vector, rotate based on mic position
            # ds=d*cos(ra)*fs/343; %Convert to sample delay
            # angularIndex(:,n)=round(ds)+bufferSize/2 +1; % Shift to put zero index in the middle


            dist, pair_edge_angle = np.sqrt((m1x-m2x)**2 + (m1y-m2y)**2), math.atan2(m2x-m1x, m2y-m1y)
            speed_times_time = 343*delay/dist
            ang_relative_to_mic_edge = np.arccos(speed_times_time if np.abs(speed_times_time) <= 1 else 1 * np.sign(speed_times_time))
            geometric_angles = (ang_relative_to_mic_edge - pair_edge_angle) % (2*np.pi), (ang_relative_to_mic_edge + pair_edge_angle)  % (2*np.pi)
            
            # ------------------------------------------------------
            
            this_chunk_delays.append(delay)
            this_chunk_pairwise_delays.append(delay)
            this_chunk_pairwise_angles.append(geometric_angles)
            this_chunk_flattened_angles += list(geometric_angles)
                    
        all_pairwise_delays.append(this_chunk_pairwise_delays)
        all_delays_flattened += this_chunk_delays
        all_pairwise_angles.append(this_chunk_pairwise_angles)
        all_angles_per_chunk.append(this_chunk_flattened_angles)
     
    # For tuning the animation
    ANIM_LEN_SEC = 10 # 10

    # For the angle est. line in the animation
    LINE_LEN = 0.15
     
    ANGLE_LINE_ALPHA = 0.1
    ANGLE_LINE_LW = 3
    
    EXAGGERATION = 2.
    
    MIC_EDGE_COLOR = (1, 0, 0)
    MIC_EDGE_OPACITY = 0.6
    ANGLE_LINE_COLOR = (0, 1, 0)

    # Move the angle drawing right so it doesn't overlap
    # the red lines as much (so we don't misinterpret dark overlap as a high-conf angle)
    ANGLE_LINE_X_OFFS = 0.02
     
    # Create an animation 
    fig, ax = plt.subplots()
    ax.set_title(f"{test_0}")
    ax.set(title=f"{test_0}", xlim=(-0.22, 0.16), ylim=(-0.16, 0.16))
    t_text = ax.text(0.007, 0.03, "t")
        
    # Setup the orignal line color and positions of the animation.
    # Use the last chunk done (bound to "this_chunk_pressure_data") to grab headers.
    lines = []

    for mic1_idx, mic2_idx in mic_pairs:
        mic1_pos = positions.iloc[mic1_idx]
        mic2_pos = positions.iloc[mic2_idx]
        m1x, m1y = tuple(mic1_pos)
        m2x, m2y = tuple(mic2_pos)
        
        # angle, symmetric_angle = this_chunk_pairwise_angles[pair_idx]

        
        # move edges right by factor of line length to remove overlaps
        xo = np.sqrt((m1x - m2x)**2 + (m1y-m2y)**2) * 0.03
        
        # Graph the mic edge
        mic_edge_line, = ax.plot([m1x+xo, m2x+xo], [m1y, m2y], c=MIC_EDGE_COLOR)
        

        
        # Graph possible angle and its symmetry
        angle_line, = ax.plot(0, 0, c=ANGLE_LINE_COLOR, lw=ANGLE_LINE_LW, alpha=ANGLE_LINE_ALPHA)
        angle_symmetric_line, = ax.plot(0, 0, c=ANGLE_LINE_COLOR, lw=ANGLE_LINE_LW, alpha=ANGLE_LINE_ALPHA)
        lines.append((mic_edge_line, angle_line, angle_symmetric_line))
  
    max_delay, min_delay = max(all_delays_flattened), min(all_delays_flattened)
 
    def update(chunk_idx):
        """For our animation. For each chunk create one animation frame, in which we update the line colors."""

        frame_idx = chunk_idx*N_ADVANCE_PER_CHUNK

        this_chunk_pairwise_delays = all_pairwise_delays[chunk_idx]
        this_chunk_pairwise_angles = all_pairwise_angles[chunk_idx]
        
        
        # For each frame, we need to recolor each edge based on the delays of this chunk.
        for pair_idx, (mic1_idx, mic2_idx) in enumerate(mic_pairs):
            delay = this_chunk_pairwise_delays[pair_idx]
            
            # Since all time delays are relatively close to eachother,
            # scale the color by first normalizing then putting through an exponential.
            # Makes higher values *much* more intense.
            alpha = 1 if max_delay == min_delay else np.exp(EXAGGERATION*(delay - min_delay) / (max_delay - min_delay)) / (np.exp(EXAGGERATION))

 
            mic_edge_line, angle_line, angle_symmetric_line = lines[pair_idx]
 
            mic_edge_line.set_alpha(alpha*MIC_EDGE_OPACITY)
            mic_edge_line.set_color(MIC_EDGE_COLOR)
            

            
            angle, symmetric_angle = this_chunk_pairwise_angles[pair_idx]
            angle_line.set_xdata([ANGLE_LINE_X_OFFS, ANGLE_LINE_X_OFFS + LINE_LEN*np.cos(angle)])
            angle_line.set_ydata([0, LINE_LEN*np.sin(angle)])
            angle_symmetric_line.set_xdata([ANGLE_LINE_X_OFFS, ANGLE_LINE_X_OFFS + LINE_LEN*np.cos(symmetric_angle)])
            angle_symmetric_line.set_ydata([0, LINE_LEN*np.sin(symmetric_angle)])
            
        t_text.set_text((
            f"Step # {chunk_idx}\n"
            f"t={n_frames_to_ms(frame_idx):.2f}ms\n"
            f"Frame # {frame_idx}\n"
            f"Chunk size:\n  {N_FRAMES_PER_CHUNK} frames ({n_frames_to_ms(N_FRAMES_PER_CHUNK):.2f}ms)\n"
            f"frame adv. per chunk:\n  {N_ADVANCE_PER_CHUNK} frames ({n_frames_to_ms(N_ADVANCE_PER_CHUNK):.2f}ms)\n"
            "Darker=longer delay"
        ))
            
        return *lines, t_text,
    
    # Save the animation.
    ani = animation.FuncAnimation(fig, update, frames=n_steps, interval=ANIM_LEN_SEC * 1000 / n_steps)
    test_0_same_dir= test_0.relative_to(*test_0.parts[:1])
    ani.save(f"{test_0_same_dir}_anim.mp4")
    plt.close()
    


In [4]:
def run_all_test_anims():
    """Run and animate each timeseries test, with angles computed. Outputs saved alongside this file."""
    test_dirs = [f for f in Path("Data").iterdir() if f.is_dir()]

    for test in test_dirs:  
        run_anim(test)

In [5]:
run_all_test_anims()

Running for test Data/test_01_white_noise_0_fwd...


  cc = np.fft.irfft(R / np.abs(R), n=(interp * n))


Running for test Data/test_02_white_noise_45_left...


  cc = np.fft.irfft(R / np.abs(R), n=(interp * n))


Running for test Data/test_03_white_noise_90_left...


  cc = np.fft.irfft(R / np.abs(R), n=(interp * n))


Running for test Data/test_04_engine_noise_no_talking...


  cc = np.fft.irfft(R / np.abs(R), n=(interp * n))


Running for test Data/test_06_engine_noise_talking...


  cc = np.fft.irfft(R / np.abs(R), n=(interp * n))


In [None]:
def combine_test_videos(output_video_name):
    """Create a unified video that combines the test videos.
    
    This function suppresses errors during running ffmpeg, so
    if it's not working, make sure you have ffmpeg installed on 
    your computer.
    """
    
    files = []
    
    # Grab singular test video files in this directory.
    for f in glob.glob("test_*.mp4", root_dir="."):
        files.append(f)
    
    # Only combine if all the test files are produced correctly.    
    if len(files) >= 5:
        subprocess.run(shlex.split(f"""ffmpeg -y \
            -i {files[0]} -i {files[1]} \
            -i {files[2]} -i {files[3]} \
            -i {files[4]} -i {files[4]} \
            -filter_complex \
            "[0:v][1:v][2:v]hstack=inputs=3[top];\
            [3:v][4:v][5:v]hstack=inputs=3[bottom];\
            [top][bottom]vstack=inputs=2[v]" \
            -map "[v]" \
            {output_video_name}
        """),
            stdout = subprocess.DEVNULL,
            stderr = subprocess.DEVNULL
        )

        


In [None]:
COMBINED_NAME = "all_tests_anim.mp4"
combine_test_videos(COMBINED_NAME)

Video(COMBINED_NAME)