In [5]:
import os
import pandas as pd
import numpy as np
from pt_utils import *
from pt_dataset import *
from pt_models import *
from pt_utils import *
import torch
from torch.utils.data import DataLoader
from datetime import datetime
from transformers import Wav2Vec2Processor,Wav2Vec2FeatureExtractor,AutoModel
from tensorboardX import SummaryWriter
from pt_utils import load_data, prepare_data, reshaping_data_for_model, unsplit_data_ogsize
from pt_dataset import BreathingDataset
import scipy.stats
from torch.cuda.amp import autocast

def create_run_directory():
    base_dir = "pt_runs"
    timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
    run_dir = os.path.join(base_dir, timestamp)
    os.makedirs(run_dir, exist_ok=True)
    return run_dir

def _calculate_flattened_accuracy(average, ground_truth_labels):
    s_acc = 0
    for b in range(len(ground_truth_labels)):
        s, _ = scipy.stats.pearsonr(average[b], ground_truth_labels[b])
        s_acc += s
    return s_acc / len(ground_truth_labels)

def _choose_real_labs_only_with_filenames(labels, filenames):
    return labels[labels['filename'].isin(filenames)]

def _get_ground_truth_labels(ground_truth_names, labels):
    ground_truth_labels = []
    for batch_name in ground_truth_names:
        ground_truth_label = _choose_real_labs_only_with_filenames(labels, [batch_name])
        ground_truth_labels.append(ground_truth_label)
    return np.array(ground_truth_labels)[:, :, -1].astype(np.float32)

def prepare_test_datasets(path_to_test_data, path_to_test_labels, window_size=16, step_size=6, batch_size=10, processor=None):
    """
    Load and prepare test datasets, saving them for later use
    """
    # Parameters
    length_sequence = window_size 
    step_sequence = step_size

    # Load and prepare test data
    test_data, test_labels, test_dict, frame_rate = load_data(path_to_test_data, path_to_test_labels, 'test')
    prepared_test_data, prepared_test_labels, prepared_test_labels_timesteps = prepare_data(
        test_data, test_labels, test_dict, frame_rate, 
        length_sequence * 16000, step_sequence * 16000
    )

    # Reshape data
    test_d, test_lbs = reshaping_data_for_model(prepared_test_data, prepared_test_labels)
    print(f"Test data shape: {test_d.shape}")

    # Create dataset
    test_dataset = BreathingDataset(test_d, test_lbs, processor, window_size, step_sequence)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, num_workers=3, collate_fn=test_dataset.collate_fn)
    
    # Save the prepared data
    save_path = f'prepared_test_data_{datetime.now().strftime("%Y%m%d-%H%M%S")}.npz'
    np.savez_compressed(save_path, 
                       test_labels=test_labels,
                       test_dict=test_dict,
                       prepared_test_labels_timesteps=prepared_test_labels_timesteps,
                       output_size=prepared_test_labels.shape[-1])
    
    print(f"Saved prepared data to {save_path}")
    
    return test_loader, save_path

def run_model_inference(test_loader, prepared_data_file, model_path=None, config=None):
    """
    Run model inference using prepared data loader
    """
    # Load prepared data info
    data_info = np.load(prepared_data_file, allow_pickle=True)
    config["output_size"] = int(data_info['output_size'])
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Load model
    model = config["model"](config)
    model.load_state_dict(torch.load(model_path))
    model = model.to(device)
    model = model.half()

    # Evaluate model
    model.eval()
    test_pred = []
    test_loss = 0.0
    progress_bar = tqdm(test_loader, desc=f"Test")

    with torch.no_grad():
        for batch_d, batch_lbs in progress_bar:
            with torch.amp.autocast(device_type="cuda"):
                batch_d = batch_d.to(device)
                batch_lbs = batch_lbs.to(device)
                batch_d = model(batch_d)
                loss = correlation_coefficient_loss(batch_d, batch_lbs)
            
            test_loss += loss.item()
            test_pred.extend(batch_d.float().cpu().numpy())
            
            progress_bar.set_postfix({'test loss: ': f'{test_loss/(progress_bar.n+1):.4f}'})
            
            del loss, batch_d, batch_lbs
            torch.cuda.empty_cache()

    test_loss /= len(test_loader)
    
    # Get shape from saved data
    prepared_test_labels_timesteps = data_info['prepared_test_labels_timesteps']
    test_pred = np.array(test_pred).reshape(prepared_test_labels_timesteps.shape)
    
    # Save predictions and necessary data
    save_path = f'model_predictions_{datetime.now().strftime("%Y%m%d-%H%M%S")}.npz'
    np.savez_compressed(save_path,
                       predictions=test_pred,
                       test_labels=data_info['test_labels'],
                       test_dict=data_info['test_dict'],
                       prepared_test_labels_timesteps=prepared_test_labels_timesteps,
                       test_loss=test_loss)
    
    print(f"Saved predictions to {save_path}")
    return save_path

def calculate_metrics(predictions_file, run_dir=None):
    """
    Calculate metrics from saved predictions
    """
    if run_dir is None:
        run_dir = create_run_directory()
    
    log_dir = os.path.join(run_dir, "logs")
    os.makedirs(log_dir, exist_ok=True)
    
    # Load saved predictions and data
    data = np.load(predictions_file, allow_pickle=True)
    test_pred = data['predictions']
    test_labels = data['test_labels']
    test_dict = data['test_dict'].item()  # Convert numpy object array to dict
    prepared_test_labels_timesteps = data['prepared_test_labels_timesteps']
    test_loss = float(data['test_loss'])

    # Calculate metrics
    test_ground_truth = _get_ground_truth_labels(list(test_dict.values()), test_labels)
    test_pred_flat = concatenate_prediction(test_labels, test_pred, prepared_test_labels_timesteps, test_dict)
    test_prc_coef = _calculate_flattened_accuracy(test_pred_flat, test_ground_truth)

    # Print results
    print("\nEvaluation completed.")
    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Pearson Coefficient (flattened): {test_prc_coef:.4f}")

    # Log with tensorboard
    writer = SummaryWriter(log_dir=log_dir)
    writer.add_scalar("Test/loss", test_loss, 0)
    writer.add_scalar("Test/pearson_coef", test_prc_coef, 0)
    
    test_table = "| Metric | Value |\n" \
                 "|--------|-------|\n" \
                 f"| Test Loss | {test_loss:.4f} |\n" \
                 f"| Test Pearson Coefficient | {test_prc_coef:.4f} |\n"
    writer.add_text("Test_Metrics", test_table)
    writer.close()

    # Save results to CSV
    results_df = pd.DataFrame({
        'Test_Loss': [test_loss],
        'Test_Pearson_Coefficient': [test_prc_coef]
    })
    csv_path = os.path.join(run_dir, 'test_results.csv')
    results_df.to_csv(csv_path, index=False)
    print(f"Results saved to {csv_path}")



In [3]:
if __name__ == "__main__":
    path = "/home/glenn/Downloads/"
    #path = "../DATA/"


    # Model parameters
    model_config = {
        "RespBertCNNModel": {
            'model' : RespBertCNNModel,
            "model_name": "microsoft/wavlm-large",
            "hidden_units": 256,
            "output_size": None  
        }
    }

    # Evaluation parameters
    window_size = 30
    step_size = 25
    batch_size = 4
    
    config = model_config["RespBertCNNModel"]
    #processor = Wav2vec2F.from_pretrained(config["model_name"])
    processor = Wav2Vec2FeatureExtractor.from_pretrained(config["model_name"])

    # Create and initialize model
    model_folder = "/home/glenn/Downloads/pt_runs/pt_runs/"

    # Load the pre-trained model weights
    model_path = model_folder+"Wavml_cnn_full/best_model"  # Update this path


    test_loader, prepared_data_file = prepare_test_datasets(
    path_to_test_data=path+"ComParE2020_Breathing/wav/",
    path_to_test_labels=path+"ComParE2020_Breathing/lab/",
    processor=processor
    )

    # Then run the model inference
    predictions_file = run_model_inference(
        test_loader=test_loader,
        prepared_data_file=prepared_data_file,
        model_path='path/to/model',
        config=config
    )

    # Later, calculate metrics from saved predictions


Test data shape: (160, 480000)


  model.load_state_dict(torch.load(model_path))
Test: 100%|██████████| 40/40 [05:47<00:00,  8.68s/it, test loss: =0.1761]


Saved predictions and data to model_predictions_20241023-190407.npz


In [7]:
predictions_file = "/home/glenn/Documents/GitHub/Master_thesis/1dcnn_breathing_prediction/model_predictions_20241023-190407.npz"
path = "/home/glenn/Downloads/"
#path = "../DATA/"

test_loader, prepared_data_file = prepare_test_datasets(
path_to_test_data=path+"ComParE2020_Breathing/wav/",
path_to_test_labels=path+"ComParE2020_Breathing/lab/",
processor=None
)

calculate_metrics(predictions_file)


Test data shape: (624, 256000)
Saved prepared data to prepared_test_data_20241023-191657.npz


IndexError: only integers, slices (`:`), ellipsis (`...`), numpy.newaxis (`None`) and integer or boolean arrays are valid indices