In [None]:
# SPDX-FileCopyrightText: 2024 Universidad Autónoma de Madrid, Madrid (Spain)
# SPDX-License-Identifier: EUPL-1.2

"""
This script automates the spectral extraction process by defining the wavelength ratio, applying quality filters, and creating spectral groups.
It allows processing multiple (previosuly calibrated) hyperspectral images at once. 

Written by: Claudia Fournier
Revised date: 24.10.2024
"""

# Load the libraries

In [2]:
from spectral import open_image, imshow  # For handling and displaying hyperspectral images
from glob import glob                    # For finding all pathnames matching a specified pattern
import matplotlib.pyplot as plt           # For creating plots and visualizations
import pandas as pd                       # For data handling and manipulation, especially with tabular data
import numpy as np                        # For numerical operations and array manipulation, essential for processing spectral data
import os                                 # For interacting with the operating system, particularly for file paths and directory handling

# Define the funcitons

In [3]:
# flexibilizando el group_sze: con 20 pixels por espectro, y quedándome solo el 3% de los espectros resultantes
# así se mantiene la variabilidad y se suaviza el ruido

def process_image(image_path, output_directory, group_size=None, threshold=1.5, quality_filter_threshold=0.01):
    # Load calibrated image
    img = open_image(image_path)
    
    # Create wavelength array
    wavelengths = np.array(img.metadata['wavelength'], dtype=float)
    
    # Define the wavelengths (find the indices for 751 nm and 676 nm)
    wavelength_751_index = int(np.argmin(np.abs(wavelengths - 751)))
    wavelength_676_index = int(np.argmin(np.abs(wavelengths - 676)))
    
    # Compute the ratio
    ratio_image = img[:, :, wavelength_751_index] / img[:, :, wavelength_676_index]
    
    # Create a mask based on the ratio
    mask = ratio_image > threshold
    
    # Create subdirectory for the image's parent directory
    parent_directory_name = os.path.basename(os.path.dirname(image_path))
    image_output_directory = os.path.join(output_directory, parent_directory_name)
    if not os.path.exists(image_output_directory):
        os.makedirs(image_output_directory)
    
    # Save the original image as PNG
    original_image_filename = os.path.join(image_output_directory, f"{parent_directory_name}_original.png")
    plt.figure(figsize=(10, 8))
    imshow(img, (86, 52, 18))
    plt.savefig(original_image_filename)
    plt.close()
    
    # Save image with mask overlay as PNG
    mask_overlay_filename = os.path.join(image_output_directory, f"{parent_directory_name}_mask_overlay.png")
    plt.figure(figsize=(10, 8))
    imshow(img, (86, 52, 18))
    plt.imshow(mask, cmap='coolwarm', alpha=0.7)
    plt.title(f'Mask Overlay for 751/676 > {threshold}')
    plt.xlabel('')
    plt.ylabel('')
    plt.grid(False)
    plt.colorbar(label=f'Ratio Mask > {threshold}')
    plt.savefig(mask_overlay_filename)
    plt.close()
    
    # Extract spectra using the mask
    indices = np.argwhere(mask)
    spectra_data = []
    for idx in indices:
        x, y = idx[0], idx[1]
        spectra = img[x, y, :]
        spectra_data.append(spectra)
    
    spectra_data = np.array(spectra_data).reshape(len(spectra_data), -1)
    
    # Create DataFrame with spectra_data
    wavelength_columns = [f"{wavelength:.2f}" for wavelength in wavelengths]
    df = pd.DataFrame(spectra_data, columns=wavelength_columns)
    
    # Apply quality filter
    row_means = df.mean(axis=1)
    filtered_df = df[row_means >= quality_filter_threshold].reset_index(drop=True)
    
    # Calculate average spectrum (mean along rows) of filtered data
    avg_spectrum_filtered = filtered_df.mean(axis=0)
    std_spectrum_filtered = filtered_df.std(axis=0)
    
    # Save average spectra plot after filtering as PNG
    average_spectra_plot_filename = os.path.join(image_output_directory, f"{parent_directory_name}_average_spectra_filtered.png")
    plt.figure(figsize=(10, 4))
    plt.plot(wavelengths, avg_spectrum_filtered, label='Average Spectrum After Filtering')
    plt.xlabel('Wavelength (nm)')
    plt.ylabel('Reflectance')
    plt.title('Average Spectrum After Filtering')
    plt.legend()
    plt.grid(True)
    plt.savefig(average_spectra_plot_filename)
    plt.close()
    
    # Save average spectra plot with standard deviation as PNG
    average_spectra_std_plot_filename = os.path.join(image_output_directory, f"{parent_directory_name}_average_spectra_std_filtered.png")
    plt.figure(figsize=(10, 4))
    plt.plot(wavelengths, avg_spectrum_filtered, label='Average Spectrum')
    plt.fill_between(wavelengths, avg_spectrum_filtered - std_spectrum_filtered, avg_spectrum_filtered + std_spectrum_filtered, color='gray', alpha=0.5, label='Standard Deviation')
    plt.xlabel('Wavelength (nm)')
    plt.ylabel('Reflectance')
    plt.title('Average Spectrum with Standard Deviation')
    plt.legend()
    plt.grid(True)
    plt.savefig(average_spectra_std_plot_filename)
    plt.close()
    
    # Determine group_size if not provided
    if group_size is None:
        group_size = len(filtered_df)  # Set to the number of pixels if not specified
    
    # Randomly group and average every 'group_size' rows
    np.random.seed(42)
    num_groups = len(filtered_df) // group_size
    shuffled_df = filtered_df.sample(frac=1).reset_index(drop=True)
    averaged_rows = [shuffled_df.iloc[i*group_size:(i+1)*group_size].mean(axis=0) for i in range(num_groups)]
    averaged_df_shuffled = pd.DataFrame(averaged_rows)

    # Randomly select 3% of the resulting averaged spectra
    top_3_percent_count = int(len(averaged_df_shuffled) * 0.03)  # Calculate 3% of the averaged spectra
    top_3_percent_df = averaged_df_shuffled.sample(n=top_3_percent_count, random_state=42).reset_index(drop=True)

    # Plot and save spectra of the selected 3% rows
    spectra_plot_filename = os.path.join(image_output_directory, f"{parent_directory_name}_selected_3_percent_spectra.png")
    plt.figure(figsize=(10, 4))
    for idx in range(min(100, len(top_3_percent_df))):
        plt.plot(wavelengths, top_3_percent_df.iloc[idx], label=f'Row {idx+1}')
    plt.xlabel('Wavelength (nm)')
    plt.ylabel('Reflectance')
    plt.title('Spectra of Selected 3% Rows')
    plt.grid(True)
    plt.savefig(spectra_plot_filename)
    plt.close()

    # Save the selected 3% DataFrame as CSV in the subdirectory
    csv_filename = os.path.join(image_output_directory, f"{parent_directory_name}_averaged_spectra.csv")
    top_3_percent_df.to_csv(csv_filename, index=False)
    print(f"Selected 3% DataFrame saved as CSV file: {csv_filename}")
    
    # Create report text file
    report_filename = os.path.join(image_output_directory, f"{parent_directory_name}_report.txt")
    with open(report_filename, 'w') as f:
        # Number of pixels within the mask containing biomass
        num_pixels_mask = np.count_nonzero(mask)
        f.write(f"Pixels within the mask containing biomass: {num_pixels_mask}\n")

        # Verify the number of pixels in the mask and rows in the DataFrame
        num_rows_df = len(df)
        if num_pixels_mask == num_rows_df:
            f.write("Number of pixels in mask and number of rows in df are the same.\n")
        else:
            f.write("Number of pixels in mask and number of rows in df are different.\n")

        # Original and filtered DataFrame information
        f.write(f"Original number of rows: {df.shape[0]}\n")
        f.write(f"Number of rows after filtering: {filtered_df.shape[0]}\n")
        rows_removed = df.shape[0] - filtered_df.shape[0]
        f.write(f"{rows_removed} rows were removed.\n")

        # Number of rows in the original and the averaged DataFrame
        f.write(f"Original number of rows: {len(filtered_df)}\n")
        f.write(f"Number of rows after averaging every {group_size} rows: {len(averaged_df_shuffled)}\n")

        # Number of spectra selected by the 3%
        f.write(f"Number of spectra selected by the 3%: {top_3_percent_count}\n")

    print(f"Report saved as TXT file: {report_filename}")

# Extracting and goruping spectra from al calibrated images within a given directory

In [4]:
# Set the base path and define the directory of the calibrated image
base_path = os.getcwd()
default_input_path = os.path.join(base_path, "data", "calibrated")
default_output_path = os.path.join(base_path, "data", "calibrated", "spectral_extraction")

# Directory with calibrated images
input_directory = default_input_path
output_directory = default_output_path

# Process all images in the directory and subdirectories
for root, dirs, files in os.walk(input_directory):
    for file in files:
        if file.endswith('.hdr'):
            image_path = os.path.join(root, file)
            process_image(image_path, output_directory, group_size=20)



Selected 3% DataFrame saved as CSV file: C:\Users\cfour\Desktop\git_projects\hyper_mix\code_data_publication\hyperspectral_analysis\notebooks\data\calibrated\spectral_extraction\003\003_averaged_spectra.csv
Report saved as TXT file: C:\Users\cfour\Desktop\git_projects\hyper_mix\code_data_publication\hyperspectral_analysis\notebooks\data\calibrated\spectral_extraction\003\003_report.txt


<Figure size 1000x800 with 0 Axes>

<Figure size 1000x800 with 0 Axes>