In [None]:
#test on test dataset 1 
#you can find and download this testing dataset in the test folder

import tensorflow as tf
import numpy as np
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
import matplotlib.pyplot as plt

def evaluate_model(model_path, test_data_path, save_path):
    # Load the saved model
    model = tf.keras.models.load_model(model_path)
    
    # Setup test data generator
    test_datagen = ImageDataGenerator(rescale=1./255)
    
    test_generator = test_datagen.flow_from_directory(
        test_data_path,
        target_size=(224, 224),
        batch_size=32,
        class_mode='binary',
        shuffle=False
    )
    
    # Get predictions
    predictions = model.predict(test_generator)
    predicted_classes = (predictions > 0.5).astype(int)
    true_classes = test_generator.classes
    
    # Calculate and print metrics
    print("\nClassification Report:")
    print(classification_report(true_classes, predicted_classes))
    
    # Calculate confusion matrix
    cm = confusion_matrix(true_classes, predicted_classes)
    print("\nConfusion Matrix:")
    print(cm)
    
    # Calculate metrics from confusion matrix
    tn, fp, fn, tp = cm.ravel()
    
    # Calculate accuracy correctly
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    f1_score = 2 * (precision * recall) / (precision + recall)
    
    print(f"\nDetailed Metrics:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1_score:.4f}")
    print(f"True Negatives: {tn}")
    print(f"False Positives: {fp}")
    print(f"False Negatives: {fn}")
    print(f"True Positives: {tp}")
    
    # Calculate ROC curve and AUC
    fpr, tpr, _ = roc_curve(true_classes, predictions)
    roc_auc = auc(fpr, tpr)
    
    # Plot ROC curve
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc="lower right")
    
    # Save the plot instead of displaying it
    plt.savefig(save_path)
    plt.close()
    
    print(f"\nAUC-ROC Score: {roc_auc:.4f}")
    print(f"ROC curve plot saved to: {save_path}")
    
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score,
        'auc_roc': roc_auc,
        'confusion_matrix': cm,
        'fpr': fpr,
        'tpr': tpr
    }

# Paths to your model and test data
model_path = r'path_to_your_model'
test_data_path = r'path_to_test_dataset_1'
save_path = r'path_to_save_location'

# Run evaluation
results = evaluate_model(model_path, test_data_path, save_path)

In [None]:
#Infer on hour long audio files
#this code will run the model recursively over a folder of .wav files and generate a prediction csv for each file
#you can test our models in the 'models' folder, and run them over the test2 dataset in the 'test' folder'

import os
import librosa
import numpy as np
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.image import img_to_array
from PIL import Image, ImageEnhance
import csv
import matplotlib.pyplot as plt
import librosa.display
import gc
import logging
import traceback
import concurrent.futures
import threading
import matplotlib

matplotlib.use('Agg')

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Global variables
model = None
target_size = (224, 224)
brightness_factor = 0.8
contrast_factor = 2.0
batch_size = 32
duration = 5
overlap = 2.5
sample_rate = 48000

# Thread-local storage for model
thread_local = threading.local()

def get_model():
    if not hasattr(thread_local, 'model'):
        thread_local.model = load_model(r'path_to_your_model.keras')
    return thread_local.model

def create_and_process_spectrogram(y, sr):
    try:
        if len(y) == 0:
            logging.warning("Audio segment is empty")
            return None

        S = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=2048, n_mels=256, fmax=24000, hop_length=512)
        S_DB = librosa.power_to_db(S, ref=np.max)

        fig, ax = plt.subplots(figsize=(3.5, 3.5))
        img = ax.imshow(S_DB, aspect='auto', origin='lower', cmap='viridis')
        ax.axis('off')
        plt.subplots_adjust(left=0, right=1, top=1, bottom=0)

        fig.canvas.draw()
        img_array = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
        img_array = img_array.reshape(fig.canvas.get_width_height()[::-1] + (3,))
        plt.close(fig)

        img = Image.fromarray(img_array).convert('RGB')
        enhancer = ImageEnhance.Brightness(img)
        img = enhancer.enhance(brightness_factor)
        enhancer = ImageEnhance.Contrast(img)
        img = enhancer.enhance(contrast_factor)
        img = img.resize(target_size)

        return img_to_array(img)
    except Exception as e:
        logging.error(f"Error in create_and_process_spectrogram: {str(e)}")
        logging.error(traceback.format_exc())
        return None

def predict_chunk(chunk, sr, start_time):
    try:
        images = []
        segment_times = []
        segment_duration = 5
        segment_samples = int(sr * segment_duration)
        
        for i in range(0, len(chunk), int(sr * (segment_duration - overlap))):
            segment = chunk[i:i + segment_samples]
            if len(segment) < segment_samples:
                break
            segment_time = start_time + i / sr
            segment_times.append(segment_time)
            img_array = create_and_process_spectrogram(segment, sr)
            if img_array is not None:
                images.append(img_array)

        if not images:
            return []

        images = np.array(images)
        images = (images / 255.0) * 2.0 - 1.0
        model = get_model()
        predictions = model.predict(images, batch_size=batch_size)
        results = [(time, pred[0]) for time, pred in zip(segment_times, predictions)]
        
        del images
        gc.collect()
        
        return results
    except Exception as e:
        logging.error(f"Error in predict_chunk at time {start_time}: {str(e)}")
        logging.error(traceback.format_exc())
        return []

def process_file(filename, test_dir):
    file_path = os.path.join(test_dir, filename)
    try:
        logging.info(f"Started processing {filename}")
        chunk_duration = 300  # 5 minutes
        
        if not os.path.isfile(file_path) or not os.access(file_path, os.R_OK):
            logging.error(f"File does not exist or is not readable: {file_path}")
            return

        all_results = []
        start_time = 0

        while True:
            try:
                y, sr = librosa.load(file_path, sr=sample_rate, offset=start_time, duration=chunk_duration)
                if len(y) == 0:
                    break
                chunk_results = predict_chunk(y, sr, start_time)
                all_results.extend([(time, pred) for time, pred in chunk_results])
                logging.info(f"Processed chunk at {start_time} of {filename}, got {len(chunk_results)} results")
                start_time += chunk_duration
            except Exception as e:
                logging.error(f"Error processing chunk at {start_time} of {filename}: {str(e)}")
                break

        if all_results:
            positive_count = sum(1 for _, p in all_results if p > 0.01)
            logging.info(f"Detected {positive_count} potential owl calls out of {len(all_results)} segments in {filename}")
        else:
            logging.warning(f"No results produced for {filename}")

        # Write results to a CSV file for this audio file
        csv_filename = f"{os.path.splitext(filename)[0]}_predictions.csv"
        with open(csv_filename, 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['Start Time (s)', 'Prediction'])
            for time, prediction in all_results:
                writer.writerow([time, prediction])
        logging.info(f"Wrote {len(all_results)} results to {csv_filename}")

    except Exception as e:
        logging.error(f"Error processing {filename}: {str(e)}")
        logging.error(traceback.format_exc())

def main():
    test_dir = r'C:\Users\calla\Dropbox\2024\powerfulowl\2025_post_clusterv1\audio'
    logging.info(f"Looking for WAV files in: {test_dir}")

    if not os.path.exists(test_dir):
        logging.error(f"Directory does not exist: {test_dir}")
        return

    wav_files = [f for f in os.listdir(test_dir) if f.endswith('.wav')]
    
    if not wav_files:
        logging.warning(f"No WAV files found in {test_dir}")
        return

    logging.info(f"Found {len(wav_files)} WAV files: {', '.join(wav_files)}")

    try:
        with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
            futures = [executor.submit(process_file, filename, test_dir) for filename in wav_files]
            concurrent.futures.wait(futures)
        
        logging.info("All files have been processed.")
    except Exception as e:
        logging.error(f"Error in main function: {str(e)}")
        logging.error(traceback.format_exc())

if __name__ == '__main__':
    main()

In [None]:
#generate snippets - fix to make recursive

import pandas as pd
from pydub import AudioSegment
import os
import glob
import numpy as np

# Define the paths
folder_path = r'C:\Users\calla\Dropbox\2024\powerfulowl\2025_post_cluster_v5\5.1'
output_folder = os.path.join(folder_path, 'snippets/')

# Ensure output folder exists
os.makedirs(output_folder, exist_ok=True)

# Function to extract snippets
def extract_snippets(csv_file, threshold=0.5, duration=5000, margin=500):
    base_name = os.path.splitext(os.path.basename(csv_file))[0].replace('_predictions', '')
    wav_file = os.path.join(folder_path, base_name + '.WAV')
    
    if not os.path.exists(wav_file):
        print(f"WAV file for {csv_file} not found.")
        return
    
    # Read the CSV
    df = pd.read_csv(csv_file)
    
    # Load the WAV file
    audio = AudioSegment.from_wav(wav_file)
    
    # Find segments above threshold
    df['above_threshold'] = df['Prediction'] > threshold
    
    # Group consecutive segments
    df['group'] = (df['above_threshold'] != df['above_threshold'].shift()).cumsum()
    
    # Process each group of segments
    for group, group_df in df[df['above_threshold']].groupby('group'):
        start_time = group_df['Start Time (s)'].min()
        end_time = group_df['Start Time (s)'].max() + 5  # Add 5 seconds for the last segment
        max_prediction = group_df['Prediction'].max()
        
        # Calculate start and end times in milliseconds
        start_ms = max(0, (start_time - margin/1000) * 1000)
        end_ms = min((end_time + margin/1000) * 1000, len(audio))
        
        # Extract snippet
        snippet = audio[start_ms:end_ms]
        
        # Create output filename
        output_filename = f"{base_name}_{start_time:.2f}-{end_time:.2f}_score{max_prediction:.4f}.wav"
        output_path = os.path.join(output_folder, output_filename)
        
        # Export snippet
        snippet.export(output_path, format="wav")
        print(f"Exported: {output_filename}")

# List all CSV files in the folder
csv_files = glob.glob(os.path.join(folder_path, '*_predictions.csv'))

# Process each CSV file
for csv_file in csv_files:
    print(f"Processing: {csv_file}")
    extract_snippets(csv_file)

print("Snippets extraction for all files completed.")