In [5]:
from compare import match_audio, LossType
from Preprocessing.pre_processing import *
from Preprocessing.sliding_windows import create_sliding_windows

import pandas as pd
import torch

RECORDINGS_METADATA_PATH = './recording_examples/recordings_metadata.csv'


In [6]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Preparing files to evaluate

Install the following packages on your machine (feel free to create a local environment): `gitpython`, 

then run `python3 generate_csv_from_recordings.py`

#### Recordings_metadata headers:
- `original_recording_id`: Generated a unique recording id, truncuated from file path for conciseness
- `example_recording_id`: Generated a unique recording id, truncuated from file path for conciseness
- `example_category`: Defines the folder (Named as categories - i.e) samples, exact_recordings, and imperfect_examples that the recorded example belongs to.
  - `samples`: Represents recordings where only a phrase is stated from the original one
  - `exact_recordings`: Represents recording examples where the full sentence in the original one is re-iterated.
  - `imperfect_examples`: Represents hard recording examples sentences that contains key terms from the original recordings, but not exact:
    - Original recording transcription: They went through his files and they didn't find anything **v.s** 
    - Example Recording Transcription: They didn't find the files
- `original_transcript`: Transcription of the original recording
- `example_transcript`: Transcription of the recorded example
- `path_to_original`: Absolute path on the DICE machine to the file
- `path_to_example`: Absolute path to each example, assuming root is the top of the github repository



## Evaluate recordings

In [7]:
class Recording():
    def __init__(self, fs, unprocessed_data, recording_id):
        self.fs = fs
        self.unprocessed_data = unprocessed_data[:,0] #TODO!!!!  <------ Return one dimension only. check if this is ok
        self.id = recording_id #Identifier in case we want to refer to csv metadata
        


class Evaluation():
    def __init__(self, path_to_metadata= RECORDINGS_METADATA_PATH):
        self.metadata_df = self.read_recordings_metadata(path_to_metadata)
        self.metadata_columns = self.metadata_df.columns
        self.RATE = 24000 #Used to resample the two audio files to the same sampling rate
        self.n_mfcc = None #Used for preprocessing. variable not used at the moment
        
    def read_recordings_metadata(self, path):
        metadata = pd.read_csv(path)
        return metadata
    
        
    def read_all_recordings(self):
        recordings_truth, recordings_test = [] , [] 
        for index, row in self.metadata_df.iterrows():
            path_original, path_example = row['path_to_original'], row['path_to_example']
            id_original , id_example = row['original_recording_id'], row['example_recording_id']
            truth_fs, truth_data = read_audio(path_original, "mp4")
            test_fs, test_data  = read_audio(path_example, "m4a")

            recordings_truth.append(Recording(truth_fs, truth_data,id_original))
            recordings_test.append(Recording(test_fs, test_data, id_example))
            

        return recordings_truth, recordings_test
    
    
        
    def get_transcripts(self,current_example_recording_id):
        row = self.metadata_df.loc[self.metadata_df['example_recording_id'] == current_example_recording_id]
        display(row)
        
    ###Start of preprocessing 
    ##TODO: some stuff could be moved to the preprocessing code.
    
    ##TODO: Probably move this to preprocessing instead
    def trim_silence(self, data_original, data_test):
        #Remove silence from beginning and the end
        data_original , _ = librosa.effects.trim(data_original, top_db=40)
        data_test , _ = librosa.effects.trim(data_test,top_db=40)
        return data_original, data_test
    
    
    

    
    def preprocess_truth_and_test_data(self, original_unprocessed_data, original_framerates, test_unprocessed_data, test_framerates, target_rate=24000, N_MFCC=None, trim_silence=True):
        #TODO: Calls to_mfcc function  from the pre_processing library and preprocesses the audio
        if N_MFCC is not None: 
            pass
        #Resample audio file to same sample rate
        data_original = resample_audio(original_unprocessed_data, sampling_rate =original_framerates, target_rate = target_rate) 
        data_test = resample_audio(test_unprocessed_data, sampling_rate =test_framerates, target_rate = target_rate)
        if trim_silence:
            data_original, data_test= self.trim_silence(data_original, data_test)
        print(len(data_original))
        print(len(data_test))

        #Setting window size according to length of test recording. However, if original recording is shorter, we set it to that instead
        if len(data_test) <= len(data_original):
            window_size = len(data_test) -1
            step_size = int(window_size/2)
        else:
            window_size = len(data_original) -1
            step_size = int(window_size/2)


        original_windows = create_sliding_windows(data_original, window_size=window_size, step_size=step_size)
        test_windows = create_sliding_windows(data_test, window_size=window_size, step_size=step_size)
        return original_windows, test_windows
    
    
    #### Start of evaluation

        
    def compute_loss(self, original_windows, test_windows, test_recording_id, loss_type):
    
      
        print("Original recording sliding window dim: ", original_windows.shape)
        print("Test recording sliding window dim: ", test_windows.shape)
        #display metadata of correspond row
        self.get_transcripts(test_recording_id)

        #Calculate the MAE or RMSE of the two audio files 
        res = []
        x = match_audio(torch.Tensor(test_windows),torch.Tensor(original_windows),loss_type=loss_type)
        res.append(torch.min(x))


        res = np.array(res)
        print("Minimal MAE: ", np.amin(res))
        print("Window with minimal MAE: ", np.where(res == np.amin(res)))
        
        ##Function that combines both preprocess function and compute_loss function. 
    def evaluate_two_audio_data(self, original_unprocessed_data, original_fs, test_unprocessed_data, test_fs, test_recording_id, loss_type = LossType.MAE, target_rate=24000, N_MFCC=None, trim_silence=True):
        print("*****")
        original_windows, test_windows = preprocess_truth_and_test_data(original_unprocessed_data, original_fs, test_unprocessed_data, test_fs, target_rate, N_MFCC, trim_silence)
        self.compute_loss(original_windows, test_windows)

    





##Utility Functions
def reshape_preprocessed_audio(data):
    return data.reshape((data.shape[1], data.shape[0]))


In [8]:
# Gets all recordings, unprocessed
EvalEngine = Evaluation()
recordings_truth, recordings_test = EvalEngine.read_all_recordings()

### TEST SIMPLE ARRAY
TODO! -> Put this as a unit test instead 

In [9]:
### TEST SIMPLE ARRAY
simple_truth= [[0,1,2,3],[4,5,6,7],[8,9,10,11],[12,13,14,15]]
simple_test = [[1,2,3,4],[5,6,7,8],[9,10,11,12],[13,14,15,16]]
truth_windows = create_sliding_windows(simple_truth, window_size=2, step_size=1)
test_windows = create_sliding_windows(simple_test, window_size=2, step_size=1)
print(truth_windows)
print("******")
print(test_windows)

[[[ 0  4]
  [ 1  5]
  [ 2  6]
  [ 3  7]]

 [[ 4  8]
  [ 5  9]
  [ 6 10]
  [ 7 11]]

 [[ 8 12]
  [ 9 13]
  [10 14]
  [11 15]]]
******
[[[ 1  5]
  [ 2  6]
  [ 3  7]
  [ 4  8]]

 [[ 5  9]
  [ 6 10]
  [ 7 11]
  [ 8 12]]

 [[ 9 13]
  [10 14]
  [11 15]
  [12 16]]]


In [10]:
print(test_windows[2])

[[ 9 13]
 [10 14]
 [11 15]
 [12 16]]


In [11]:
match_audio(torch.Tensor(test_windows[2]),torch.Tensor(truth_windows))

tensor([[9.],
        [5.],
        [1.]])