# PyRadiomics Feature Extraction Pipeline

## Overview
This notebook provides a batch processing pipeline for extracting radiomics features from medical images using PyRadiomics.

## Requirements
- pyradiomics
- pandas
- numpy
- SimpleITK (installed with pyradiomics)

## Input Data Structure
```
data/
├── images/          # Original medical images (.nii.gz, .nrrd, etc.)
└── masks/           # Segmentation masks (.nii.gz, .nrrd, etc.)
```

## Output
- Excel file containing extracted radiomics features for each sample

## 1. Import Libraries

In [None]:
import pyradiomics
import pandas as pd
import numpy as np
import os
from radiomics import featureextractor

print(f"PyRadiomics version: {pyradiomics.__version__}")

## 2. Configuration

**Please modify the following parameters according to your data:**

In [None]:
# =============================================================================
# Configuration - Please modify according to your data
# =============================================================================

# Path to PyRadiomics parameter file (YAML)
PARAMS_FILE = '../config/extraction_params.yaml'

# Input directories
IMAGE_DIR = '../data/images/'     # Directory containing original images
OUTPUT_DIR = '../output/results/'       # Directory containing segmentation masks

# Output settings
OUTPUT_DIR = 'results/'
OUTPUT_FILENAME = 'extracted_features.xlsx'

# Whether to include provenance information (diagnostics)
INCLUDE_PROVENANCE = False

# Create output directory if not exists
os.makedirs(OUTPUT_DIR, exist_ok=True)

## 3. Initialize Feature Extractor

In [None]:
# Initialize the feature extractor with parameter file
extractor = featureextractor.RadiomicsFeatureExtractor()
extractor.loadParams(PARAMS_FILE)

# Disable provenance if not needed (cleaner output)
if not INCLUDE_PROVENANCE:
    extractor.addProvenance(False)

print("Feature extractor initialized successfully!")
print(f"Enabled features: {extractor.enabledFeatures}")

## 4. List Input Files

In [None]:
# List all image and mask files
image_files = sorted(os.listdir(IMAGE_DIR))
mask_files = sorted(os.listdir(MASK_DIR))

print(f"Found {len(image_files)} images")
print(f"Found {len(mask_files)} masks")

# Preview first few files
print("\nFirst 5 images:")
for f in image_files[:5]:
    print(f"  - {f}")

print("\nFirst 5 masks:")
for f in mask_files[:5]:
    print(f"  - {f}")

## 5. Validate Image-Mask Pairing

Ensure that each image has a corresponding mask file.

In [None]:
def get_sample_id(filename, suffix_to_remove=None):
    """
    Extract sample ID from filename.
    Modify this function according to your naming convention.
    
    Parameters
    ----------
    filename : str
        Input filename
    suffix_to_remove : str, optional
        Suffix pattern to remove from filename
    
    Returns
    -------
    str : Sample ID
    """
    # Remove common extensions
    name = filename
    for ext in ['.nii.gz', '.nii', '.nrrd', '.mha', '.mhd']:
        if name.endswith(ext):
            name = name[:-len(ext)]
            break
    
    # Remove additional suffix if specified
    if suffix_to_remove and name.endswith(suffix_to_remove):
        name = name[:-len(suffix_to_remove)]
    
    return name


def validate_pairing(image_files, mask_files):
    """
    Validate that images and masks can be paired.
    Returns list of valid (image, mask) pairs.
    """
    valid_pairs = []
    unmatched_images = []
    
    # Create mask lookup dictionary
    mask_lookup = {get_sample_id(m): m for m in mask_files}
    
    for img in image_files:
        img_id = get_sample_id(img)
        
        # Try to find matching mask
        matched = False
        for mask_id, mask_file in mask_lookup.items():
            if img_id in mask_id or mask_id in img_id:
                valid_pairs.append((img, mask_file))
                matched = True
                break
        
        if not matched:
            unmatched_images.append(img)
    
    return valid_pairs, unmatched_images


# Validate pairing
valid_pairs, unmatched = validate_pairing(image_files, mask_files)

print(f"Valid image-mask pairs: {len(valid_pairs)}")
if unmatched:
    print(f"\nWarning: {len(unmatched)} images without matching masks:")
    for img in unmatched[:5]:
        print(f"  - {img}")

## 6. Batch Feature Extraction

In [None]:
def extract_features_batch(extractor, image_dir, mask_dir, pairs):
    """
    Extract radiomics features for all image-mask pairs.
    
    Parameters
    ----------
    extractor : RadiomicsFeatureExtractor
        Initialized feature extractor
    image_dir : str
        Directory containing images
    mask_dir : str
        Directory containing masks
    pairs : list of tuple
        List of (image_file, mask_file) pairs
    
    Returns
    -------
    pd.DataFrame : DataFrame containing extracted features
    """
    results = []
    failed = []
    
    total = len(pairs)
    
    for i, (img_file, mask_file) in enumerate(pairs):
        img_path = os.path.join(image_dir, img_file)
        mask_path = os.path.join(mask_dir, mask_file)
        
        try:
            # Extract features
            feature_vector = extractor.execute(img_path, mask_path)
            
            # Convert to DataFrame row
            row = pd.DataFrame([feature_vector])
            row['IMAGE'] = img_file
            row['MASK'] = mask_file
            
            results.append(row)
            
            # Progress update
            if (i + 1) % 10 == 0 or (i + 1) == total:
                print(f"Progress: {i+1}/{total} ({(i+1)/total*100:.1f}%)")
                
        except Exception as e:
            print(f"Error processing {img_file}: {e}")
            failed.append((img_file, str(e)))
    
    # Combine all results
    if results:
        df = pd.concat(results, ignore_index=True)
    else:
        df = pd.DataFrame()
    
    return df, failed

In [None]:
# Run batch extraction
print("Starting feature extraction...")
print("=" * 60)

features_df, failed_cases = extract_features_batch(
    extractor, IMAGE_DIR, MASK_DIR, valid_pairs
)

print("\n" + "=" * 60)
print("Extraction completed!")
print(f"Successfully processed: {len(features_df)} samples")
print(f"Failed: {len(failed_cases)} samples")

## 7. Preview Results

In [None]:
print(f"Extracted features shape: {features_df.shape}")
print(f"Number of features: {features_df.shape[1] - 2}")  # Excluding IMAGE and MASK columns

# Show feature categories
feature_cols = [col for col in features_df.columns if col not in ['IMAGE', 'MASK']]
print("\nFeature categories:")

categories = {}
for col in feature_cols:
    if '_' in col:
        prefix = col.split('_')[0]
        categories[prefix] = categories.get(prefix, 0) + 1

for cat, count in sorted(categories.items()):
    print(f"  {cat}: {count} features")

In [None]:
# Preview first few rows
display(features_df.head())

## 8. Save Results

In [None]:
# Save to Excel
output_path = os.path.join(OUTPUT_DIR, OUTPUT_FILENAME)
features_df.to_excel(output_path, index=False)
print(f"Features saved to: {output_path}")

# Save failed cases if any
if failed_cases:
    failed_df = pd.DataFrame(failed_cases, columns=['Image', 'Error'])
    failed_path = os.path.join(OUTPUT_DIR, 'failed_cases.csv')
    failed_df.to_csv(failed_path, index=False)
    print(f"Failed cases saved to: {failed_path}")

## 9. Summary Report

In [None]:
print("\n" + "=" * 70)
print("              Feature Extraction Summary Report")
print("=" * 70)

print(f"\n[Input Data]")
print(f"  - Image directory: {IMAGE_DIR}")
print(f"  - Mask directory: {MASK_DIR}")
print(f"  - Parameter file: {PARAMS_FILE}")

print(f"\n[Processing Results]")
print(f"  - Total pairs found: {len(valid_pairs)}")
print(f"  - Successfully processed: {len(features_df)}")
print(f"  - Failed: {len(failed_cases)}")

print(f"\n[Extracted Features]")
print(f"  - Total features: {len(feature_cols)}")
for cat, count in sorted(categories.items()):
    print(f"    - {cat}: {count}")

print(f"\n[Output]")
print(f"  - Results saved to: {output_path}")

print("\n" + "=" * 70)

---

## Appendix: Single Sample Extraction (Optional)

Use this section to test extraction on a single sample before batch processing.

In [None]:
# # Uncomment to test single sample extraction
# 
# # Specify single image and mask
# test_image = os.path.join(IMAGE_DIR, image_files[0])
# test_mask = os.path.join(MASK_DIR, mask_files[0])
# 
# print(f"Testing extraction on:")
# print(f"  Image: {test_image}")
# print(f"  Mask: {test_mask}")
# 
# # Extract features
# test_features = extractor.execute(test_image, test_mask)
# 
# # Display results
# print(f"\nExtracted {len(test_features)} features")
# for key, value in list(test_features.items())[:10]:
#     print(f"  {key}: {value}")