# General Setup

In [None]:
import os

os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"  # see issue #152
os.environ["CUDA_VISIBLE_DEVICES"] = "1"

from visualdl import vdl
import cv2
import numpy as np

pred = vdl.get_inference_model(r"C:\Users\philmarq\source\repos\VisualDL\megfinalglyphe\efficientnet-b7, UnetPlusPlus.pt")


## Create metrics with visualdl models segmentation (optional)

In [4]:
import numpy as np
import cv2
import os
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score
import seaborn as sns
import matplotlib.pyplot as plt

def sliding_window_inference(model, image, window_size=1024, overlap=0.5):
    h, w = image.shape[:2]
    stride = int(window_size * (1 - overlap))
    
    # Initialize accumulator for logits and counter for averaging
    nc = 2  # number of classes - adjust as needed
    logits_acc = np.zeros((nc, h, w), dtype=np.float32)
    count_acc = np.zeros((h, w), dtype=np.float32)
    
    # Pad image if needed
    pad_h = (window_size - h % window_size) % window_size
    pad_w = (window_size - w % window_size) % window_size
    padded_image = cv2.copyMakeBorder(image, 0, pad_h, 0, pad_w, cv2.BORDER_REFLECT)
    
    # Sliding window inference
    for y in range(0, h, stride):
        for x in range(0, w, stride):
            # Extract window
            window = padded_image[y:y+window_size, x:x+window_size]
            if window.shape[:2] != (window_size, window_size):
                continue
                
            # Get prediction for window
            window_pred = model.predict([window], single_class_per_contour=False, return_logits=True)
            
            # Add to accumulator
            y_end = min(y + window_size, h)
            x_end = min(x + window_size, w)
            window_h, window_w = y_end - y, x_end - x
            
            logits_acc[:, y:y_end, x:x_end] += window_pred[:, :window_h, :window_w]
            count_acc[y:y_end, x:x_end] += 1
    
    # Average the logits
    count_acc = np.maximum(count_acc, 1)
    for c in range(nc):
        logits_acc[c] /= count_acc
        
    # Get final prediction
    final_pred = np.argmax(logits_acc, axis=0)
    return final_pred

def create_overlay(original_img, mask, alpha=0.5, color=[255, 0, 0]):
    """Create an overlay of the mask on the original image"""
    overlay = np.zeros_like(original_img)
    overlay[mask > 0] = color  # Apply the specified color for positive regions
    return cv2.addWeighted(original_img, 1-alpha, overlay, alpha, 0)

def calculate_iou(pred, target):
    """Calculate Intersection over Union"""
    intersection = np.logical_and(pred, target).sum()
    union = np.logical_or(pred, target).sum()
    return intersection / (union + 1e-6)  # Add small epsilon to avoid division by zero

def plot_confusion_matrix(cm, output_path):
    """Plot confusion matrix using seaborn"""
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.savefig(output_path)
    plt.close()

def calculate_metrics(y_true, y_pred):
    """Calculate all metrics and return as dictionary"""
    # Convert to binary (0 or 1)
    y_true_bin = y_true > 0
    y_pred_bin = y_pred > 0
    
    # Flatten arrays
    y_true_flat = y_true_bin.flatten()
    y_pred_flat = y_pred_bin.flatten()
    
    # Calculate metrics
    cm = confusion_matrix(y_true_flat, y_pred_flat)
    iou = calculate_iou(y_pred_bin, y_true_bin)
    precision = precision_score(y_true_flat, y_pred_flat)
    recall = recall_score(y_true_flat, y_pred_flat)
    f1 = f1_score(y_true_flat, y_pred_flat)
    
    # Calculate additional metrics from confusion matrix
    tn, fp, fn, tp = cm.ravel()
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    specificity = tn / (tn + fp + 1e-6)
    
    return {
        'IoU': iou,
        'Precision': precision,
        'Recall': recall,
        'F1-Score': f1,
        'Accuracy': accuracy,
        'Specificity': specificity,
        'Confusion Matrix': cm
    }

# Main execution
base = r"C:/Users/philmarq/source/repos/Daten/datasetmegfinal/dataset/valid/images"
label_base = r"C:/Users/philmarq/source/repos/Daten/datasetmegfinal/dataset/valid/labels"
output_dir = "outputallsecond"
os.makedirs(output_dir, exist_ok=True)

# Initialize aggregated metrics
all_metrics = []

# Create metric output files
metrics_file = os.path.join(output_dir, 'metrics.txt')
aggregate_metrics_file = os.path.join(output_dir, 'aggregate_metrics.txt')

for file in os.listdir(base):
    image_path = os.path.join(base, file)
    label_path = os.path.join(label_base, file)
    

    img = cv2.cvtColor(cv2.imread(image_path), cv2.COLOR_BGR2RGB)
    orig_img = cv2.imread(image_path)
    

    preds = sliding_window_inference(pred, img, window_size=1024, overlap=0.0)
    preds = preds * 50
    

    label = cv2.imread(label_path, cv2.IMREAD_GRAYSCALE)
    label *= 50
    

    metrics = calculate_metrics(label, preds)
    all_metrics.append(metrics)
    

    plot_confusion_matrix(
        metrics['Confusion Matrix'],
        os.path.join(output_dir, f'confusion_matrix_{os.path.splitext(file)[0]}.png')
    )
    

    with open(metrics_file, 'a') as f:
        f.write(f"\nMetrics for {file}:\n")
        for metric_name, value in metrics.items():
            if metric_name != 'Confusion Matrix':
                f.write(f"{metric_name}: {value:.4f}\n")
        f.write("-" * 50 + "\n")
    
    pred_overlay = create_overlay(orig_img.copy(), preds > 0, alpha=0.25, color=[255, 0, 0])
    truth_overlay = create_overlay(orig_img.copy(), label > 0, alpha=0.25, color=[0, 255, 0])
    
    preds_colored = cv2.cvtColor(preds.astype(np.uint8), cv2.COLOR_GRAY2BGR)
    label_colored = cv2.cvtColor(label.astype(np.uint8), cv2.COLOR_GRAY2BGR)
    
    combined_img = cv2.hconcat([preds_colored, label_colored, pred_overlay, truth_overlay])
    cv2.imwrite(os.path.join(output_dir, file), combined_img)


with open(aggregate_metrics_file, 'w') as f:
    f.write("Aggregate Metrics (Mean ± Std):\n")
    for metric in ['IoU', 'Precision', 'Recall', 'F1-Score', 'Accuracy', 'Specificity']:
        values = [m[metric] for m in all_metrics]
        mean_val = np.mean(values)
        std_val = np.std(values)
        f.write(f"{metric}: {mean_val:.4f} ± {std_val:.4f}\n")

print("Processing complete. Check the output directory for results.")

Processing complete. Check the output directory for results.


# Extract information from model

## print all custom data information

In [7]:
pred.state["custom_data"]

{'structure_indices': [2, 3],
 'image_size': 1024,
 'modeltype': 'Full Image',
 'object_based': False,
 'physical_tile_size': '(226.388852600035',
 '226.388852600035)': None,
 'project_type': 'dummy',
 'pyramid_level': 0,
 'datetime': '21/01/2022 15:33',
 'structures': 'Glomerulus',
 'objects_count': 312,
 'model': "[{'backbone': 'efficientnet-b7', 'decoder': 'UnetPlusPlus'}]",
 'files': {'File': ['05_.czi',
   '06_.czi',
   '07_.czi',
   '12_.czi',
   '13_.czi',
   '14_.czi',
   '17_.czi',
   '18_.czi',
   '19_.czi',
   '05_.czi',
   '06_.czi',
   '07_.czi',
   '12_.czi',
   '13_.czi',
   '14_.czi',
   '17_.czi',
   '18_.czi',
   '19_.czi'],
  'Scene': [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]},
 'calculate_weight_map': False}

## access string which actually creates the model

In [10]:
pred.state["model"]

'smp.create_model(arch="UnetPlusPlus", encoder_name="efficientnet-b7", encoder_weights="imagenet", in_channels=3, classes = 2)'

## access metrics

In [11]:
pred.state.keys()

dict_keys(['epoch', 'model_state_dict', 'optimizer_state_dict', 'model', 'has_distance_map', 'validation_metrics', 'train_metrics', 'custom_data'])

In [12]:
pred.state["train_metrics"]

{'Accuracy': 0.9950659871101379,
 'IoU': 0.9320716857910156,
 'train_loss': 0.018424429236223328}

In [13]:
pred.state["validation_metrics"]

{'Accuracy': 0.9778851866722107,
 'IoU': 0.7717246413230896,
 'validation_monitor_metric': 0.7717246413230896,
 'validation_loss': 0.11256959306245501}

# Convert to yaml

In [17]:
import yaml
from typing import Dict, Any
import json
def convert_data_to_yaml(
    custom_data: Dict[str, Any],
    training_info: Dict[str, Any],
    training_settings: Dict[str, Any]
) -> str:
    """
    Convert model data to YAML format using custom_data and two additional JSON objects.
    
    Args:
        custom_data (Dict[str, Any]): Dictionary containing model custom data
        training_info (Dict[str, Any]): Dictionary containing training information
        training_settings (Dict[str, Any]): Dictionary containing training settings
        
    Returns:
        str: Formatted YAML string
    """
    yaml_dict = {
        'model_name': training_settings.get('meta_data', {}).get('name', 'No Information!'),
        'trainer_name': 'No Information!',
        'training_date': training_info.get('creation_date', 'No Information!'),
        'model_metadata': {
            'name': training_settings.get('meta_data', {}).get('name', 'No Information!'),
            'lib': 'VisualDL',
            'training_information': {
                'creation_date': training_info.get('creation_date', 'No Information!'),
                'completion_date': training_info.get('completion_date', 'No Information!'),
                'system_information': training_info.get('system_information', {
                    'os': 'No Information!',
                    'cpu': 'No Information!',
                    'gpu': 'No Information!',
                    'ram': 'No Information!'
                }),
                'training_status': training_info.get('training_status', 'No Information!'),
                'error_message': training_info.get('error_message', '')
            },
            'training_settings': {
                'project_id': training_settings.get('project_id', 'No Information!'),
                'project_type': training_settings.get('project_type', custom_data.get('project_type', 'No Information!')),
                'meta_data': training_settings.get('meta_data', {
                    'name': 'No Information!',
                    'version': '',
                    'description': '',
                    'is_new_model': True
                }),
                'dataset_parameters': training_settings.get('dataset_parameters', {
                    'dataset_approach': 2,
                    'dataset_type': 0,
                    'dataset_only': False,
                    'use_existing_dataset': False
                }),
                'training_parameters': training_settings.get('training_parameters', {
                    'epochs': 100,
                    'early_stopping': 500,
                    'batch_size': 2,
                    'metrics': [0],
                    'loss_functions': [0, 1],
                    'optimizer': 1,
                    'learning_rate': 0.0001
                }),
                'model_parameters': training_settings.get('model_parameters', {
                    'description': 'No Information!',
                    'version': 'No Information!',
                    'creation_date': 'No Information!',
                    'completion_date': 'No Information!',
                    'dataset_type': 0,
                    'input_channels': 3,
                    'fluorescence_channels': [],
                    'spatial_dims': 2,
                    'image_width': custom_data.get('image_size', 'No Information!'),
                    'image_height': custom_data.get('image_size', 'No Information!'),
                    'number_of_classes': len(custom_data.get('structure_indices', ['No Information!'])),
                    'pyramid_level': custom_data.get('pyramid_level', 'No Information!')
                })
            },
            'structures': training_settings.get('structures', []),
            'training_computer_name': 'Unknown',
            'raw_custom_data': custom_data
        },
        'hsa_kit_metadata': {
            'version': '0.0.0.0',
            'hsa_user_name': 'Unknown'
        },
        'gtds': [],
        'description_version': custom_data.get('datetime', 'No Information!')
    }
    
    return yaml.dump(yaml_dict, default_flow_style=False, sort_keys=False)

In [18]:
custom_data = vdl.get_inference_model(r"C:\Users\philmarq\source\repos\VisualDL\custom_experiments\HyperNetNonVessel\001.pt").state["custom_data"]


with open(r"C:\Users\philmarq\source\repos\VisualDL\custom_experiments\HyperNetNonVessel\training_information.json", 'r') as file:
    training_information = json.load(file)  
    
with open(r"C:\Users\philmarq\source\repos\VisualDL\custom_experiments\HyperNetNonVessel\training_settings_config.json", 'r') as file:
    training_settings_config = json.load(file)  

In [19]:
yml = convert_data_to_yaml(custom_data, training_information, training_settings_config)

In [20]:
yml

'model_name: HyperNetNonVessel\ntrainer_name: No Information!\ntraining_date: \'2023-02-14T14:26:00\'\nmodel_metadata:\n  name: HyperNetNonVessel\n  lib: VisualDL\n  training_information:\n    creation_date: \'2023-02-14T14:26:00\'\n    completion_date: \'2023-02-14T14:26:00\'\n    system_information:\n      os: No Information!\n      cpu: No Information!\n      gpu: No Information!\n      ram: No Information!\n    training_status: finished\n    error_message: \'\'\n  training_settings:\n    project_id: 00000000-0000-0000-0000-000000000000\n    project_type: AnnotationTool (Generic)\n    meta_data:\n      name: HyperNetNonVessel\n      version: \'\'\n      description: \'\'\n      is_new_model: true\n    dataset_parameters:\n      dataset_approach: 2\n      dataset_type: 2\n      dataset_only: false\n      use_existing_dataset: false\n    training_parameters:\n      epochs: 100\n      early_stopping: 500\n      batch_size: 2\n      metrics:\n      - 0\n      loss_functions:\n      - 0\

In [21]:
with open('output.yaml', 'w') as file:
    file.write(yml)