This notebook performs file averaging, using fnmatch patterns in variable averaging_patterns, taking those
files, and averaging them together. You will need to specify the fnmatch patterns to perform this file averaging at the bottom.

REVISIT: Should I average PDIs or just CSVs?
REVISIT: I want to move the dat file and metadata reading function to a different directory. Would helper_code be good?
- It can help us with plotting, file averaging, and ensuring accuracy.



In [2]:
from pathlib import Path
import numpy as np
import fnmatch

In [None]:


def group_files_by_fnmatch_patterns(file_list: list[Path], pattern_groups: list) -> dict:
    """
    Groups files based on fnmatch pattern groups (similar to Step1 notebook approach)
    Input: file_list: List of file paths
           pattern_groups: List of pattern lists (e.g., [["*Run6*RampT*"], ["*Run7*RampT*"]]).
           Each pattern corresponds to a single group
    Output: Dictionary mapping base filenames to list of file paths
    """
    groups = {}
    matched_files = set()  # Track which files have been matched to avoid duplicates

    for pattern_group in enumerate(pattern_groups):
        group_files = []

        # For each pattern in the group, find matching files
        for pattern in pattern_group[1]:  # pattern_group is (index, patterns_list)
            for file_path in file_list:
                if file_path not in matched_files and fnmatch.fnmatch(file_path.name, pattern):
                    group_files.append(file_path)
                    matched_files.add(file_path)

        # Only create a group if files were found
        if group_files:
            # Use base filename from first file (without extension) as group name
            first_file = group_files[0]
            group_name = first_file.stem  # Gets filename without extension
            groups[group_name] = group_files
        else:
            print(f"Warning: group {pattern_group} did not match any patterns:")

    unmatched_files = [f for f in file_list if f not in matched_files]
    if unmatched_files:
        print(f"{len(unmatched_files)} files did not match any patterns:")

    return groups

def average_dat_files(file_paths: list) -> tuple:
    """
    Averages multiple .dat files using simple column averaging
    Input: List of .dat file paths to average
    Output: (header_lines, avg_q, avg_intensity, avg_sigma, averaged_metadata)
    """
    if not file_paths:
        raise ValueError("No files to average")

    # Use header from first file and collect metadata
    header_lines, first_q, first_intensity, first_sigma, first_metadata = read_dat_data_metadata(file_paths[0])

    # Initialize arrays for averaging
    all_q = [first_q]
    all_intensity = [first_intensity]
    all_sigma = [first_sigma]
    all_metadata = [first_metadata]

    # Read remaining files
    for file_path in file_paths[1:]:
        _, q_vals, intensity_vals, sigma_vals, metadata = read_dat_data_metadata(file_path)
        all_q.append(q_vals)
        all_intensity.append(intensity_vals)
        all_sigma.append(sigma_vals)
        all_metadata.append(metadata)

    # Convert to numpy arrays and average data
    all_q = np.array(all_q)
    all_intensity = np.array(all_intensity)
    all_sigma = np.array(all_sigma)

    # Simple averaging across files (axis=0)
    avg_q = np.mean(all_q, axis=0)
    avg_intensity = np.mean(all_intensity, axis=0)
    avg_sigma = np.mean(all_sigma, axis=0)

    # Average metadata
    averaged_metadata = {}
    if all_metadata:
        # Get all unique keys from all metadata dictionaries
        all_keys = set()
        for metadata in all_metadata:
            all_keys.update(metadata.keys())

        # Average each key across all files
        for key in all_keys:
            values = []
            for metadata in all_metadata:
                if key in metadata:
                    values.append(metadata[key])
                else:
                    # Throw error if any file is missing metadata after averaging
                    raise ValueError(f"Metadata key '{key}' missing from file {file_paths[all_metadata.index(metadata)]}")

            if values:
                averaged_metadata[key] = np.mean(values)

    return header_lines, avg_q, avg_intensity, avg_sigma, averaged_metadata


def save_averaged_data(header_lines: list, q, intensity, sigma, averaged_metadata: dict, output_path: Path):
    """
    Saves averaged data in .dat format with original header and averaged metadata
    """
    with open(output_path, 'w') as f:
        # Write header comments from first file
        for line in header_lines:
            f.write(f"{line}\n")

        # Write data
        for i in range(len(q)):
            f.write(f"  {q[i]:e}    {intensity[i]:e}   {sigma[i]:e}\n")

        # Write averaged metadata section
        if averaged_metadata:
            f.write("# METADATA INFORMATION (YML FORMAT, AVERAGED)\n")
            for key, value in averaged_metadata.items():
                f.write(f"# {key}: {value}\n")


def average_files_in_directory(input_dir: Path, detector_type: str, pattern_groups: list):
    """
    Takes in an individual SAXS/Reduction or WAXS/Reduction directory and performs averaging
    for files and writes averaged files to SAXS/Averaged or WAXS/Averaged
    """
    # Get all .dat files in the directory
    dat_files = list(input_dir.glob("*.dat"))
    
    if not dat_files:
        print(f"No .dat files found in {input_dir}")
        return
    
    file_groups = group_files_by_fnmatch_patterns(dat_files, pattern_groups)

    
    # Create output directory
    output_dir = input_dir.parent / "Averaged"
    output_dir.mkdir(exist_ok=True)
    
    print(f"Processing {len(file_groups)} groups in {input_dir}")
    
    # Process each group
    for group_key, files_in_group in file_groups.items():
        print(f"  Averaging {len(files_in_group)} files for pattern: {group_key}")

        # Average the files
        header_lines, avg_q, avg_intensity, avg_sigma, averaged_metadata = average_dat_files(files_in_group)

        # Create output filename using base filename + "_Averaged"
        output_filename = f"{group_key}_Averaged.dat"
        output_path = output_dir / output_filename

        # Save averaged data
        save_averaged_data(header_lines, avg_q, avg_intensity, avg_sigma, averaged_metadata, output_path)

        print(f"    Saved: {output_path}")
            
def process_directory(base_dir: Path):
    """
    Main function to process both SAXS and WAXS directories using fnmatch patterns
    Input: Path to 1D/ directory containing SAXS/ and WAXS/ subdirectories
    """
    saxs_reduction_dir = base_dir / "SAXS" / "Reduction"
    waxs_reduction_dir = base_dir / "WAXS" / "Reduction"
    
    print(f"Processing directory: {base_dir}")
    print(f"Using averaging patterns: {len(averaging_patterns)} pattern groups")
    for i, patterns in enumerate(averaging_patterns):
        print(f"  Group {i+1}: {patterns}")
    
    if saxs_reduction_dir.exists():
        print("\nProcessing SAXS files...")
        average_files_in_directory(saxs_reduction_dir, "SAXS", averaging_patterns)
    else:
        print(f"SAXS Reduction directory not found: {saxs_reduction_dir}")
    
    if waxs_reduction_dir.exists():
        print("\nProcessing WAXS files...")
        average_files_in_directory(waxs_reduction_dir, "WAXS", averaging_patterns)
    else:
        print(f"WAXS Reduction directory not found: {waxs_reduction_dir}")
    
    print("\nAveraging complete!")



Processing directory: larger_test/1D
Using averaging patterns: 4 pattern groups
  Group 1: ['*Hor_scan_Run4*']
  Group 2: ['*Run10_AcOH*T20*ctr0*']
  Group 3: ['*Run10_AcOH*T22*ctr1*']
  Group 4: ['*Run11*PS_AcOH*T157*ctr55*']
SAXS Reduction directory not found: larger_test/1D/SAXS/Reduction
WAXS Reduction directory not found: larger_test/1D/WAXS/Reduction

Averaging complete!


In [None]:
# Process the entire 1D directory

averaging_patterns = [
['*Hor_scan_Run4*'],
['*Run10_AcOH*T20*ctr0*'],
['*Run10_AcOH*T22*ctr1*'],
['*Run11*PS_AcOH*T157*ctr55*']
]
process_directory(Path("larger_test/1D/")
)
