### Imports

In [2]:
import os
import sys
import warnings
from pathlib import Path
import time
import csv
import io

# import threading
# import multiprocessing
import concurrent.futures

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import dask.dataframe as dd
import polars as pl

# import pyarrow as pa
# import pyarrow.csv as csv


from scipy.signal import find_peaks

# import requests
# from bs4 import BeautifulSoup

CURRENT_DIR = Path(os.getcwd())

# Move to the root directory

ROOT_DIR = CURRENT_DIR.parents[0]  # Adjust the number based on your folder structure

RAW_FILE_PATH = ROOT_DIR / "wavelength_spectra_files"
RAW_FILE_PATH.mkdir(parents=True, exist_ok=True)
EXPORTS_FILE_PATH = ROOT_DIR / "exports"
EXPORTS_FILE_PATH.mkdir(parents=True, exist_ok=True)

# Add the root directory to the system path

sys.path.append(str(ROOT_DIR))

# Input Raw File and Decoder File

In [5]:
wafer_codes = ["QCI0Q"]  # List of wafer codes
warnings.filterwarnings("ignore")

ANALYSIS_RUN_NAME = "peak_detection_optimising"

SUBARU_DECODER = "QC WAFER_LAYOUT 24Dec.csv"
HALO_DECODER = "HALO_DECODER_NE-rev1_1 logic_coords_annotated.csv"


def find_files_with_wafer_codes(wafer_codes, raw_file_path):
    # Initialize an empty list to store the paths of the files
    file_paths = []
    # Iterate over each wafer code in the list
    for code in wafer_codes:
        # Iterate over each file in the raw file path directory
        for root, dirs, files in os.walk(raw_file_path):
            for file in files:
                # Check if the wafer code is in the file name
                if code in file:
                    # Add the full path of the file to the list
                    file_paths.append(Path(root) / file)
    return file_paths


# Call the function and print the results

file_paths = find_files_with_wafer_codes(wafer_codes, RAW_FILE_PATH)

print(file_paths)

[WindowsPath('c:/Users/762093/Documents/vadankhan-wavelength-spectra-analyser/wavelength_spectra_files/QCI0Q_spectra.csv')]


# Transform Data

### Transform

In [14]:
# Decoder loading function
def load_decoder(decoder_file_path):
    print(f"Loading decoder from: {decoder_file_path}")
    start_time = time.time()

    if not decoder_file_path.exists():
        print(f"Decoder file not found at {decoder_file_path}")
        return pd.DataFrame()

    df_decoder = pd.read_csv(decoder_file_path, usecols=["Logic_X", "Logic_Y", "TE_LABEL", "TYPE"])
    df_decoder = df_decoder.set_index(["Logic_X", "Logic_Y"])

    end_time = time.time()
    print(f"Loaded in {end_time - start_time:.2f} seconds.\n")
    return df_decoder


def get_columns_in_wavelength_range(filepath, wavelength_lb, wavelength_ub):
    # Read just the header row
    with open(filepath, "r") as f:
        header = f.readline().strip().split(",")

    # Find columns that are within the desired wavelength range
    selected_intensity_cols = [col for col in header if col.startswith("Intensity_") and wavelength_lb <= float(col.split("_")[1]) <= wavelength_ub]

    # Add essential metadata columns
    return ["X", "Y"] + selected_intensity_cols


def append_table_to_csv(table, output_path, include_header):
    sink = io.BytesIO()
    csv.write_csv(table, sink, write_options=csv.WriteOptions(include_header=include_header))
    sink.seek(0)
    with open(output_path, "a", encoding="utf-8") as f:
        f.write(sink.read().decode("utf-8"))


def transform_raw_file(filepath, wafer_id, decoder_df, wavelength_lb=824, wavelength_ub=832, chunksize=1000, max_chunks=1):
    print(f"Starting file transformation for {wafer_id}...")
    total_t0 = time.time()

    t1 = time.time()
    col_names = pd.read_csv(filepath, nrows=1).columns
    intensity_cols = [col for col in col_names if col.startswith("Intensity_")]
    wavelengths = {col: float(col.split("_")[1]) for col in intensity_cols}
    selected_intensity_cols = [col for col, wl in wavelengths.items() if wavelength_lb <= wl <= wavelength_ub]
    usecols = ["X", "Y"] + selected_intensity_cols
    data_points_threshold = len(selected_intensity_cols)
    print(f"Header parsing and column filtering took {time.time() - t1:.2f} s")

    with pd.read_csv(filepath, chunksize=chunksize, usecols=usecols) as reader:
        for i, chunk in enumerate(reader):
            if i >= max_chunks:
                break

            chunk_start = time.time()

            # Base transformation
            t_base = time.time()
            long_df = chunk.melt(id_vars=["X", "Y"], value_vars=selected_intensity_cols, var_name="Wavelength", value_name="Intensity")
            long_df["Wavelength"] = long_df["Wavelength"].map(wavelengths)
            long_df = long_df.merge(decoder_df, left_on=["X", "Y"], right_index=True, how="left")
            long_df = long_df.drop(columns=["X", "Y"])
            long_df = long_df[["TYPE", "TE_LABEL", "Wavelength", "Intensity"]]
            t_base_elapsed = time.time() - t_base

            yield long_df, data_points_threshold, t_base_elapsed

            # print(f"Chunk {i+1} | Base transform: {t_base_elapsed:.2f}s | Total time: {time.time() - chunk_start:.2f}s")

    print(f"File transformation for {wafer_id} completed in {time.time() - total_t0:.2f} seconds.")


def extract_top_two_peaks(df_group):
    """
    Detects the top two peaks in a spectrum.
    Returns:
        peak_series (pd.Series): Summary of top peaks and SMSR.
        timing_info (dict): Time taken for each step.
    """
    t_start = time.time()
    timing = {}

    t0 = time.time()
    df_sorted = df_group.sort_values("Wavelength")
    timing["sort"] = time.time() - t0

    t0 = time.time()
    intensities = df_sorted["Intensity"].values
    dB_intensities = df_sorted["dB_Intensity"].values
    wavelengths = df_sorted["Wavelength"].values
    peak_indices, _ = find_peaks(dB_intensities)
    timing["find_peaks"] = time.time() - t0

    t0 = time.time()
    if len(peak_indices) == 0:
        peak_series = pd.Series(
            {
                "highest_peak_wavelength": np.nan,
                "highest_peak_intensity_linear": np.nan,
                "second_peak_wavelength": np.nan,
                "second_peak_intensity_linear": np.nan,
                "SMSR_dB": np.nan,
                "SMSR_linear": np.nan,
            }
        )
        timing["ordering"] = 0.0
        timing["extraction"] = 0.0
        return peak_series, timing

    sorted_order = np.argsort(dB_intensities[peak_indices])[::-1]
    timing["ordering"] = time.time() - t0

    t0 = time.time()
    highest_idx = peak_indices[sorted_order[0]]
    highest_peak_wavelength = wavelengths[highest_idx]
    highest_peak_intensity_linear = intensities[highest_idx]

    if len(sorted_order) > 1:
        second_idx = peak_indices[sorted_order[1]]
        second_peak_wavelength = wavelengths[second_idx]
        second_peak_intensity_linear = intensities[second_idx]
        second_peak_dB = dB_intensities[second_idx]

        SMSR_dB = -second_peak_dB
        SMSR_linear = highest_peak_intensity_linear / second_peak_intensity_linear
    else:
        second_peak_wavelength = np.nan
        second_peak_intensity_linear = np.nan
        SMSR_dB = np.nan
        SMSR_linear = np.nan

    peak_series = pd.Series(
        {
            "highest_peak_wavelength": highest_peak_wavelength,
            "highest_peak_intensity_linear": highest_peak_intensity_linear,
            "second_peak_wavelength": second_peak_wavelength,
            "second_peak_intensity_linear": second_peak_intensity_linear,
            "SMSR_dB": SMSR_dB,
            "SMSR_linear": SMSR_linear,
        }
    )
    timing["extraction"] = time.time() - t0

    return peak_series, timing


def process_export_and_peaks(filepath, wafer_code, decoder_df):
    print(f"\n=== Starting processing for {wafer_code} ===")
    total_t0 = time.time()

    # File paths for the exports
    spectra_output_path = EXPORTS_FILE_PATH / f"{ANALYSIS_RUN_NAME}_{wafer_code}_spectra_formatted.csv"
    peak_output_path = EXPORTS_FILE_PATH / f"{ANALYSIS_RUN_NAME}_{wafer_code}_peaks_summary.csv"

    first_chunk = True
    first_peak = True

    accumulator = {}
    data_point_count = {}

    chunk_counter = 0

    # Column headers for the spectra file
    spectra_columns = ["TYPE", "TE_LABEL", "Wavelength", "Intensity", "dB_Intensity"]
    peak_colums = ["Highest Peak (Wavelength)", "Highest Peak (Linear Intensity)", "Second Peak (Wavelength)", "Second Peak (Linear Intensity)", "SMSR_dB", "SMSR_linear", "TE_LABEL"]

    # Write headers first for both files
    with open(spectra_output_path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(spectra_columns)
    with open(peak_output_path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(peak_colums)

    for chunk, data_points_threshold, base_time in transform_raw_file(filepath, wafer_code, decoder_df):
        chunk_counter += 1
        chunk_start = time.time()
        t_peaks_breakdown = {}

        t_peak_total = 0
        t_actual_write_total = 0
        t_dB_total = 0

        for te_label, group in chunk.groupby("TE_LABEL"):
            if te_label not in accumulator:
                accumulator[te_label] = [group]
                data_point_count[te_label] = len(group)
            else:
                accumulator[te_label].append(group)
                data_point_count[te_label] += len(group)

            if data_point_count[te_label] >= data_points_threshold:
                full_data = pd.concat(accumulator[te_label], ignore_index=True)

                # Peak processing
                t_dB_start = time.time()
                max_intensity = full_data["Intensity"].max()
                # Avoid division by zero and log10(0) → use np.where to set zero to np.nan
                safe_intensity = np.where(full_data["Intensity"] > 0, full_data["Intensity"], np.nan)
                # Compute dB only for valid values
                full_data["dB_Intensity"] = 10 * np.log10(safe_intensity / max_intensity)
                t_dB_end = time.time()
                t_dB_total += t_dB_end - t_dB_start

                # Extract top two peaks
                t_peak_start = time.time()
                peak_series, peak_times = extract_top_two_peaks(full_data)
                t_peak_end = time.time()
                t_peak_total += t_peak_end - t_peak_start

                # Add timings to the respective totals
                for k, v in peak_times.items():
                    t_peaks_breakdown[k] = t_peaks_breakdown.get(k, 0.0) + v
                peak_df = pd.DataFrame([peak_series])  # Convert peak series to DataFrame

                # Actual writing of spectra data
                t_actual_write_start = time.time()
                full_data.to_csv(spectra_output_path, mode="a", header=False, index=False)
                peak_df.to_csv(peak_output_path, mode="a", header=False, index=False)
                t_actual_write_end = time.time()
                t_actual_write_total += t_actual_write_end - t_actual_write_start

                # Clean up to free memory
                del accumulator[te_label]
                del data_point_count[te_label]

        chunk_total = time.time() - chunk_start
        print(f"Chunk {chunk_counter} Summary:")
        print(f"  Base transform: {base_time:.2f}s")
        print(f"  dB Calculation: {t_dB_total:.2f}s")
        print(f"  Peak Calculation: {t_peak_total:.2f}s")
        print(f"  Peak detection breakdown:")
        for step, t in t_peaks_breakdown.items():
            print(f"    {step:>10}: {t:.2f}s")
        print(f"  Actual writing time: {t_actual_write_total:.2f}s")
        print(f"  Chunk total:    {chunk_total:.2f}s\n")

    print(f"=== Completed processing {wafer_code} in {time.time() - total_t0:.2f} seconds ===")


# Main execution
start_total_time = time.time()
print("\n=== Starting full processing run ===\n")

print(EXPORTS_FILE_PATH)

# file_paths should be defined elsewhere, assumed to match wafer_codes order
for filepath, wafer_code in zip(file_paths, wafer_codes):
    # Extract product code from the first two characters of the wafer code
    product_code = wafer_code[:2]

    print(f"\n--- Processing wafer: {wafer_code} (Product: {product_code}) ---")

    # Select decoder file based on product code
    if product_code == "QC":
        decoder_path = ROOT_DIR / "decoders" / SUBARU_DECODER
    elif product_code == "QD" or "NV":
        decoder_path = ROOT_DIR / "decoders" / HALO_DECODER
    else:
        print(f"Unsupported product code: {product_code}")
        continue

    # Load decoder and process
    decoder_dict = load_decoder(decoder_path)
    process_export_and_peaks(filepath, wafer_code, decoder_dict)


end_total_time = time.time()
total_time = end_total_time - start_total_time


print(f"\n=== Total processing time: {total_time:.2f} seconds ===\n")


=== Starting full processing run ===

c:\Users\762093\Documents\vadankhan-wavelength-spectra-analyser\exports

--- Processing wafer: QCI0Q (Product: QC) ---
Loading decoder from: c:\Users\762093\Documents\vadankhan-wavelength-spectra-analyser\decoders\QC WAFER_LAYOUT 24Dec.csv
Loaded in 0.78 seconds.


=== Starting processing for QCI0Q ===
Starting file transformation for QCI0Q...
Header parsing and column filtering took 0.07 s
Chunk 1 Summary:
  Base transform: 0.10s
  dB Calculation: 0.57s
  Peak Calculation: 0.81s
  Peak detection breakdown:
          sort: 0.26s
    find_peaks: 0.16s
      ordering: 0.01s
    extraction: 0.36s
  Actual writing time: 3.48s
  Chunk total:    5.39s

File transformation for QCI0Q completed in 5.73 seconds.
=== Completed processing QCI0Q in 5.74 seconds ===

=== Total processing time: 6.53 seconds ===



### Experimental: Sort in Python (slower than JMP)

In [None]:
# Sort final output file by TE_LABEL
def sort_large_csv_with_dask(input_path, output_path):
    print(f"Starting Dask sort for: {input_path}")
    start_time = time.time()

    # Read CSV with Dask
    df = dd.read_csv(input_path, assume_missing=True)  # assume_missing=True is safe for mixed data

    # Sort by TE_LABEL first, then Wavelength
    df_sorted = df.sort_values(by=["TE_LABEL", "Wavelength"])

    # Save to CSV (can write to multiple files if very large)
    df_sorted.to_csv(output_path, index=False, single_file=True)

    end_time = time.time()
    print(f"Sorting and export completed in {end_time - start_time:.2f} seconds.")


# Call for each wafer (in case of multiple)
for wafer_code in wafer_codes:
    input_csv = EXPORTS_FILE_PATH / f"{ANALYSIS_RUN_NAME}_{wafer_code}_spectra_formatted.csv"
    output_csv = EXPORTS_FILE_PATH / f"{ANALYSIS_RUN_NAME}_{wafer_code}_sorted.csv"
    sort_large_csv_with_dask(input_csv, output_csv)

### Experimental: Threading Attempt

In [None]:
ANALYSIS_RUN_NAME = "larger"

RAW_FILE_PATH = ROOT_DIR / "wavelength_spectra_files"
EXPORTS_FILE_PATH = ROOT_DIR / "exports"


def transform_raw_file(filepath, wafer_id, wavelength_lb=827, wavelength_ub=830, chunksize=1000, max_chunks=1000):
    col_names = pd.read_csv(filepath, nrows=1).columns  # Read just headers
    intensity_cols = [col for col in col_names if col.startswith("Intensity_")]

    # Extract wavelengths from column names
    wavelengths = {col: float(col.split("_")[1]) for col in intensity_cols}

    # Filter columns to only include those within the desired range
    selected_intensity_cols = [col for col, wl in wavelengths.items() if wavelength_lb <= wl <= wavelength_ub]

    # Define columns to read
    usecols = ["X", "Y"] + selected_intensity_cols

    with pd.read_csv(filepath, chunksize=chunksize, usecols=usecols) as reader:
        for i, chunk in enumerate(reader):
            if i >= max_chunks:
                break  # Stop after processing max_chunks

            # Melt the dataframe: Convert wide format to long format
            long_df = chunk.melt(id_vars=["X", "Y"], value_vars=selected_intensity_cols, var_name="Wavelength", value_name="Intensity")

            # Convert "Wavelength" column from "Intensity_xxx" to just "xxx"
            long_df["Wavelength"] = long_df["Wavelength"].map(wavelengths)

            yield long_df  # Yield processed chunk instead of storing in memory


def process_and_export(filepath, wafer_code, export_path, run_name):
    try:
        print(f"Processing file: {filepath} for wafer: {wafer_code}")
        output_path = export_path / f"{run_name}_threaded_{wafer_code}_spectra_formatted.csv"
        first_chunk = True

        for transformed_chunk in transform_raw_file(filepath, wafer_code):
            # export_start_time = time.time()
            transformed_chunk.to_csv(output_path, mode="w" if first_chunk else "a", header=first_chunk, index=False)
            first_chunk = False  # Only write header for the first chunk
            # print(f"Chunk exported for wafer {wafer_code}. Time taken: {time.time() - export_start_time:.2f} seconds.")
    except Exception as e:
        print(f"Error processing wafer {wafer_code}: {e}")


def parallel_processing(filepaths, wafer_codes, export_path, run_name):
    print(f"Starting parallel processing with {len(filepaths)} files.")
    # Use ThreadPoolExecutor instead of ProcessPoolExecutor
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = {executor.submit(process_and_export, f, w, export_path, run_name): (f, w) for f, w in zip(filepaths, wafer_codes)}
        for future in concurrent.futures.as_completed(futures):
            filepath, wafer_code = futures[future]
            try:
                future.result()  # This will raise any exception raised in the process
            except Exception as e:
                print(f"Error with file {filepath}, wafer {wafer_code}: {e}")


# CALLING THE CODE
if __name__ == "__main__":
    start_total_time = time.time()
    print("\n=== Starting full processing run with multiprocessing ===\n")
    parallel_processing(file_paths, wafer_codes, EXPORTS_FILE_PATH, ANALYSIS_RUN_NAME)
    end_total_time = time.time()
    total_time = end_total_time - start_total_time
    print(f"\n=== Total processing time: {total_time:.2f} seconds ===\n")