In [None]:
import os
import numpy as np
import pandas as pd
import numpy as np
import librosa
from ketos.data_handling import selection_table as sl
from ketos.audio.audio_loader import  AudioFrameLoader, FrameStepper, audio_repres_dict
from ketos.neural_networks import load_model_file
from ketos.neural_networks.resnet import ResNetInterface
from ketos.neural_networks.dev_utils.detection import process, process_audio_loader, save_detections, merge_overlapping_detections

# File paths and annotation load

In [None]:
audio_storage_root_path='/home/sadman/projects/ctb-ruthjoy/SRKW/'
detection_file_path='../../model_detections/'
annot_file_path='../../annotations/test/' # Path where test annotations are stored

number_of_classes = 3

# =============================================================
# Uncomment one file_name to check the performance of the model
file_name='jasco_boundary_pass.csv'
# file_name='jasco_roberts_bank.csv'
# file_name='onc_barkley_canyon.csv'
# file_name='orcasound.csv'
# file_name='superpod_lime_kiln.csv'
# file_name='onc_barkley_canyon_test_multiclass.csv'
# file_name='jasco_malahat_vfpa_test_multiclass.csv'
# =============================================================

# Fetch the test dataset name for further use
file_name_except_extension=file_name[0:file_name.rindex('.')]

# Firstly, load the corresponding annotation file
annot_file_name=file_name_except_extension+'_annot.csv'
annot_path_each_file=annot_file_path+annot_file_name
annot_df=pd.read_csv(annot_path_each_file, dtype=object)

# Load previously saved model detection file

In [None]:
# Load previously saved model detection file
model_detection_filename='detections_'+file_name_except_extension+'.csv'
model_detections_df=pd.read_csv(detection_file_path+model_detection_filename)

# Annotation pre-processing

In [None]:
def remove_missing_files(dataframe):
    """ Save the detections to a csv file

        Args:
            dataframe: pandas.DataFrame
                List of files for testing a detector
                
        Returns: 
            dataframe : pandas.DataFrame
                Filtered dataframe after removing the files that do not exist
    """
    for index, row in dataframe.iterrows():
        if(os.path.isfile(row['filename'])==False):
            print(row['filename'], " not found")
            dataframe = dataframe.drop([index])
    return dataframe
    

In [None]:
# Appending audio folder location to each files
annot_df['filename'] = audio_storage_root_path + annot_df['filename'].astype(str)

# From annotations dataframe, remvoe all files which does not appear in the detection dataframe
annot_unique_filenames=annot_df['filename'].unique()
detections_unique_filenames=model_detections_df['filename'].unique()

# Remove any extra files that are in annotation dataframe but not in detection dataframe
extra_files_to_ignore=[]
for file in annot_unique_filenames:
    if(file not in detections_unique_filenames):
        extra_files_to_ignore.append(file)
annot_df = annot_df[~annot_df['filename'].isin(extra_files_to_ignore)]

# Now, remove any extra files that are in detection dataframe but not in annotation dataframe
extra_files_to_ignore=[]
for file in detections_unique_filenames:
    if(file not in annot_unique_filenames):
        extra_files_to_ignore.append(file)
model_detections_df = model_detections_df[~model_detections_df['filename'].isin(extra_files_to_ignore)]

print("Number of files BEFORE removing missing files:", len(model_detections_df))
# Make sure each file actually exists in the cedar, if not, then remove from dataframe
annot_df = remove_missing_files(annot_df)
print("Number of files AFTER removing missing files:", len(model_detections_df))

# Convert start/end time columns to numerical values
annot_df["start"] = annot_df.start.astype(float)
annot_df["end"] = annot_df.end.astype(float)
annot_df

# Some utility functions to re-format annotations and detections

In [None]:
def extract_annot_by_file(df_annot):
    """ Store the annotations of each file into a dictionary

        Args:
            df_annot: pandas.DataFrame
                DataFrame containing annotations
                
        Returns: 
            annot_by_file_dict: dict
                Separated annotations of each file by filename

    """
    annot_by_file_dict=dict()
    unique_filenames=df_annot['filename'].unique()
    for filename in unique_filenames:
        target_matched_annot_df = df_annot[df_annot['filename'].str.find(filename) != -1]
        annot_by_file_dict[filename]=target_matched_annot_df
    return annot_by_file_dict

def check_overlap(start, end, annotation_df, search_class_labels):
    """ Check if a selection has overlap in the annotation table of a specific file
     
        Args:
            start: float
                Selection start time.
            end: float
                Selection end time.
            annotation_df: pandas DataFrame
                Annotation table.
            search_class_label: int
                Class label to search in the annotation table.

        Returns:
            overlap_found: bool
                Returns True if there is overlap found, otherwise, returns False if the specifed selection does not match in the annotation dataframe.

    """
    overlap_found = False
    for annot_index, annot_row in annotation_df.iterrows():
        if((annot_row['start'] <= start <= annot_row['end']) or 
           (annot_row['start'] <= end <= annot_row['end']) or
           (start <= annot_row['start'] and end >= annot_row['end']) and
           annot_row['label'] in search_class_labels):
            overlap_found = True
            break
    return overlap_found

def validate_annot_detections_serial(annot_df, detection_df):
    """ Function to check if the annotation and detection serial is matching (filename and start time)
        It also prints the row index if any row of the annot_df and detection_df doesn't match.

        The serial of both dataframe is required to be same so that we can easily use the scikit-learn 
        packages for performance measurements.
     
        Args:
            annot_df: pandas DataFrame
                Annotation table.
            detection_df: pandas DataFrame
                Model detection dataframe.

        Returns:
            valid_flag: bool
                Returns True if both dataframes are in the same serial, Else returns False.

    """
    valid_flag=True
    for i in range(len(annot_df['filename'].values)):
        if(((annot_df['filename'].values)[i]!=(detection_df['filename'].values)[i]) or 
           ((annot_df['start'].values)[i]!=(detection_df['start'].values)[i])):
            print("Not equal at index ", i)
            valid_flag=False
    return valid_flag

# Get all files' duration in time (sec)

In [None]:
# Get all files' duration in time
unique_filenames=annot_df['filename'].unique()
duration_list=[]
for filename in unique_filenames:
    duration_list.append(librosa.get_duration(filename=filename))
    
target_files_with_len = pd.DataFrame({'filename':unique_filenames, 
                                      'duration':duration_list})

# Standardize annotations and segment annotation

In [None]:
segment_length, segment_step = 5.0, 5.0

if(file_name_except_extension=='onc_barkley_canyon_test_multiclass'):
    map_to_ketos_annot_std ={'sound_id_species': 'label'} 
    #Standardize annotation table format
    annot, label_dict = sl.standardize(annot_df, return_label_dict=True, mapper=map_to_ketos_annot_std, trim_table=True)
else:    
    #Standardize annotation table format
    annot, label_dict = sl.standardize(annot_df, return_label_dict=True, trim_table=True)

segmented_annot = sl.segment_files(table=target_files_with_len, length=segment_length, step=segment_step, pad=True)

# Resetting index to change the multi-indexed dataframe to normal columns
annot=annot.reset_index()
del annot['annot_id']
segmented_annot=segmented_annot.reset_index()
del segmented_annot['sel_id']

# Save annotations of each file in dictionary

In [None]:
# save annotations of each file in dictionary
annot_by_file_dict=extract_annot_by_file(annot)

In [None]:
label_dict # check the label mapping

# Prepare list of accepted strings for each class for the specific dataset

In [None]:
kw_labels=[]
hb_labels=[]
dolphin_labels=[]
other_labels=[]

# {'BACKGROUND': 1, 'KW': 2, 'NN': 3, 'Repeat': 4, 'UN': 5}
if(file_name_except_extension=='jasco_boundary_pass'): 
    kw_labels.append(2)
    other_labels.append(1)
    
# {'FS': 1, 'HW': 2, 'HW/KW?': 3, 'KW': 4, 'KW/PWSD?': 5, 'KW?': 6, 'Noise': 7, 'PWSD': 8, 'Sonar': 9, 'UN': 10, 'Vessel Noise': 11}
elif(file_name_except_extension=='jasco_roberts_bank'):
    kw_labels.extend([4, 6])
    hb_labels.extend([2])
    dolphin_labels.extend([8])
    other_labels.extend([1, 7, 11])
    
elif(file_name_except_extension=='onc_barkley_canyon'):
    kw_labels.extend([4, 6])
    hb_labels.extend([3])
    dolphin_labels.extend([11, 13])
    other_labels.extend([17, 18, 19, 20, 21, 22])
    
elif(file_name_except_extension=='orcasound'):
    kw_labels.extend([2])
    
elif(file_name_except_extension=='superpod_lime_kiln'):
    kw_labels.extend([1])
    other_labels.extend([1, 4, 5])
    
# {'hb': 1, 'kw': 2, 'other': 3}
elif(file_name_except_extension=='jasco_malahat_vfpa_test_multiclass'): 
    kw_labels.append(2)
    hb_labels.append(1)
    other_labels.append(3)
    
# {'D': 1, 'HB': 2, 'KW': 3}
elif(file_name_except_extension=='onc_barkley_canyon_test_multiclass'): 
    kw_labels.append(3)
    hb_labels.append(2)
    dolphin_labels.append(1)


# Populate the segmented annotation table with appropriate label

In [None]:
segmented_annot['label'] = pd.Series(dtype='object')

# Populate the segmented annotation table with appropriate label
for index, row in segmented_annot.iterrows():
    annot_filtered_by_filename_df=annot_by_file_dict[row['filename']]
    # Assign label 0 => Other/Background class label, 1 => KW, 2 => HB, and 3 => Dolphin
    # {0: 'OTHER', 1: 'KW', 2: 'HB', 3: 'D'}
    if(len(other_labels)!=0):
        other_overlap_annot_result=check_overlap(row['start'], row['end'], annot_filtered_by_filename_df, other_labels)
        if(other_overlap_annot_result):
            segmented_annot.at[index, 'label']=0
    
    if(len(kw_labels)!=0):
        kw_overlap_annot_result=check_overlap(row['start'], row['end'], annot_filtered_by_filename_df, kw_labels)
        if(kw_overlap_annot_result):
            segmented_annot.at[index, 'label']=1
            
    if(len(hb_labels)!=0):
        hb_overlap_annot_result=check_overlap(row['start'], row['end'], annot_filtered_by_filename_df, hb_labels)
        if(hb_overlap_annot_result):
            segmented_annot.at[index, 'label']=2
    
    # this if-block for four classes detector (if Dolphin class is included)
    if(number_of_classes==4 and len(dolphin_labels)!=0):
        dolphin_overlap_annot_result=check_overlap(row['start'], row['end'], annot_filtered_by_filename_df, dolphin_labels)
        if(dolphin_overlap_annot_result):
            segmented_annot.at[index, 'label']=3
            
    # Consider all the remaining labels to the other/background class
    if(pd.isnull(segmented_annot['label'][index])==True):
        segmented_annot.at[index, 'label']=0

In [None]:
segmented_annot['label'].value_counts()

In [None]:
model_detections_df['predicted_label'].value_counts()

In [None]:
len(segmented_annot['filename'].unique())

In [None]:
len(model_detections_df['filename'].unique())

# Validate if the annotation and model detection dataframes are in the same sequence

In [None]:
segmented_annot=segmented_annot.sort_values(['filename', 'start'], ascending=[True, True])
model_detections_df=model_detections_df.sort_values(['filename', 'start'], ascending=[True, True])
validate_annot_detections_serial(segmented_annot, model_detections_df)

# Calculate performance metrices

In [None]:
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix

def calculate_performance_metrics(y_true, y_pred):
    accuracy=accuracy_score(y_true, y_pred)
    precision=precision_score(y_true, y_pred, average='weighted')
    f1Score=f1_score(y_true, y_pred, average='weighted') 
    recall=recall_score(y_true, y_pred, average='weighted') 
    cm=confusion_matrix(y_true, y_pred)
    return accuracy, precision, recall, f1Score, cm

def calculate_list_average(value_list):
    return sum(value_list)/len(value_list)

In [None]:
# Calculate performance
if(number_of_classes==4):
    target_names = ['Other', 'KW', 'HB', 'D']
elif(number_of_classes==3):
    target_names = ['Other', 'KW', 'HB']
    
y_true=segmented_annot['label'].values
y_pred=model_detections_df['predicted_label'].values

print(classification_report(list(y_true), list(y_pred), target_names=target_names))
accuracy, precision, recall, f1Score, cnf_matrix=calculate_performance_metrics(list(y_true), list(y_pred))
print("Accuracy  : {}".format(accuracy))
print("Precision : {}".format(precision))
print("Recall : {}".format(recall))
print("f1Score : {}".format(f1Score))

print("Confusion matrix:", cnf_matrix)

FP = cnf_matrix.sum(axis=0) - np.diag(cnf_matrix)  
FN = cnf_matrix.sum(axis=1) - np.diag(cnf_matrix)
TP = np.diag(cnf_matrix)
TN = cnf_matrix.sum() - (FP + FN + TP)

FP = FP.astype(float)
FN = FN.astype(float)
TP = TP.astype(float)
TN = TN.astype(float)

# Specificity or true negative rate
TNR = TN/(TN+FP) 
# Fall out or false positive rate
FPR = FP/(FP+TN)

print("Specificity or true negative rate:", calculate_list_average(TNR))
print("Fall out or false positive rate:", calculate_list_average(FPR))
