### Imports

In [1]:
import sys
import time
import os
from pathlib import Path
import warnings
import requests

import pandas as pd
import numpy as np


import matplotlib.pyplot as plt
import statsmodels.api as sm
from bs4 import BeautifulSoup


from scipy.stats import linregress
from scipy.signal import cheby1, filtfilt, savgol_filter
from scipy.optimize import curve_fit, minimize


CURRENT_DIR = Path(os.getcwd())


# Move to the root directory


ROOT_DIR = CURRENT_DIR.parents[0]  # Adjust the number based on your folder structure


# Add the root directory to the system path


sys.path.append(str(ROOT_DIR))


# Import the importlib module


import importlib


# import function implementations
import stst_urls


# Reload the modules


importlib.reload(stst_urls)


# Re-import the functions


from stst_urls import GTX_URL

# Input Raw File and Decoder File

In [36]:
wafer_codes = [
    "QCI12",
    "QCI2X",
    "QCI2K",
    "QCI2T",
    "QCI48",
    "QCI2V",
    "QCI0M",
    "QCI46",
    "QCI3L",
]  # List of wafer codes

# Recent Good Wafers (non retest SQL check)
# QCI12
# QCI2X
# QCI2K
# QCI2T
# QCI48
# QCI2V
# QCI0M
# QCI46
# QCI33
# QCI3L
# QCI3E


ANALYSIS_RUN_NAME = "Mechanistic_Port_Study"

DECODER_FILE = "QC WAFER_LAYOUT 24Dec.csv"
DECODER_FILE_PATH = ROOT_DIR / "decoders" / DECODER_FILE
RESULTS_FILE_PATH = ROOT_DIR / "results"

EXPORTS_FILEPATH = ROOT_DIR / "exports"
# Create the exports folder if it doesn't exist
if not os.path.exists(EXPORTS_FILEPATH):
    os.makedirs(EXPORTS_FILEPATH)
# print(EXPORTS_FILEPATH)

warnings.filterwarnings("ignore")


def liv_raw_filelink_finder(wafer_codes, fileserver_link: str, product_code="QC"):
    fileserver_link = f"{fileserver_link}{product_code}/"
    print(f"fileserver link: {fileserver_link}")

    response = requests.get(fileserver_link, verify=False)
    soup = BeautifulSoup(response.content, "html.parser")
    links = soup.find_all("a")

    subdirectory_urls = []
    for link in links:
        href = link.get("href")
        if href and any(wafer_code in href for wafer_code in wafer_codes):
            subdirectory_urls.append(fileserver_link + href)

    # RAW file tracking
    file_urls = []
    file_cod_urls = []
    file_degradation_urls = []
    file_times = []
    file_cod_times = []
    file_degradation_times = []

    machine_list = []
    machine_dict = {}

    # Processed COD tracking
    processed_cod70_urls = []
    processed_cod250_urls = []
    processed_cod_base_urls = []

    for wafer_code, subdirectory_url in zip(wafer_codes, subdirectory_urls):
        response = requests.get(subdirectory_url, verify=False)
        soup = BeautifulSoup(response.content, "html.parser")
        links = soup.find_all("a")

        latest_file = None
        latest_cod_file = None
        latest_degradation_file = None
        latest_time = ""
        latest_cod_time = ""
        latest_degradation_time = ""
        machine_name = None

        # Processed COD placeholders
        proc_cod70 = None
        proc_cod250 = None
        proc_cod_base = None

        for link in links:
            href = link.get("href")
            if not href:
                continue

            # RAW file logic
            if "RAW" in href:
                time_str = href[-18:-4]  # Timestamp from filename

                if not machine_name:
                    machine_name = href[:6]  # Extract machine name

                if "COD250" in href:
                    if time_str > latest_cod_time:
                        latest_cod_time = time_str
                        latest_cod_file = subdirectory_url + href
                elif "COD70" in href:
                    if time_str > latest_degradation_time:
                        latest_degradation_time = time_str
                        latest_degradation_file = subdirectory_url + href
                else:
                    if time_str > latest_time:
                        latest_time = time_str
                        latest_file = subdirectory_url + href

            # Processed COD logic (only pick the first match for each type)
            elif "processed" in href and "COD" in href:
                full_url = subdirectory_url + href
                if "COD250" in href and proc_cod250 is None:
                    proc_cod250 = full_url
                elif "COD70" in href and proc_cod70 is None:
                    proc_cod70 = full_url
                elif "COD" in href and "COD250" not in href and "COD70" not in href and proc_cod_base is None:
                    proc_cod_base = full_url

        # Append results
        if latest_file:
            file_urls.append(latest_file)
            file_times.append(latest_time)
        if latest_cod_file:
            file_cod_urls.append(latest_cod_file)
            file_cod_times.append(latest_cod_time)
        if latest_degradation_file:
            file_degradation_urls.append(latest_degradation_file)
            file_degradation_times.append(latest_degradation_time)
        if machine_name:
            machine_list.append(machine_name)
            machine_dict[wafer_code] = machine_name

        # Append processed CODs
        processed_cod70_urls.append(proc_cod70)
        processed_cod250_urls.append(proc_cod250)
        processed_cod_base_urls.append(proc_cod_base)

    return (
        file_urls,
        file_cod_urls,
        file_degradation_urls,
        file_times,
        file_cod_times,
        file_degradation_times,
        machine_list,
        machine_dict,
        processed_cod70_urls,
        processed_cod250_urls,
        processed_cod_base_urls,
    )


# Calling code
(
    file_urls,
    file_cod_urls,
    file_degradation_urls,
    file_times,
    file_cod_times,
    file_degradation_times,
    machine_list,
    machine_dict,
    processed_cod70_urls,
    processed_cod250_urls,
    processed_cod_base_urls,
) = liv_raw_filelink_finder(wafer_codes, GTX_URL, "QC")
print(processed_cod_base_urls)

# DEBUG: INPUT LINKS TO OTHER GTX FILES HERE
# file_urls = [
#     "https://sprgtxprod02.stni.seagate.com/~gtx/wafer/proc_LIV/data/byProdLot/QC/QCHWQ/LIV_53_QCHWQ_DNS-LIVTKCOD_LCRVCOD250-DNS_RAW20250227044906.CSV",
#     "https://sprgtxprod02.stni.seagate.com/~gtx/wafer/proc_LIV/data/byProdLot/QC/QCHWQ/LIV_53_QCHWQ_LIVBLTKCOD_COD250-DNS_RAW20250228082707.CSV",
#     "https://sprgtxprod02.stni.seagate.com/~gtx/wafer/proc_LIV/data/byProdLot/QC/QCHWQ/LIV_53_QCHWQ_LIVBLTKCOD_COD250-DNS_RAW20250311164324.CSV",
# ]
# print(file_urls)

fileserver link: https://sprgtxprod02.stni.seagate.com/~gtx/wafer/proc_LIV/data/byProdLot/QC/
['https://sprgtxprod02.stni.seagate.com/~gtx/wafer/proc_LIV/data/byProdLot/QC/QCI0M/LIV_54_QCI0M_DNS-LIVTKCOD_LIVTK-DNS_STX_processed.csv', 'https://sprgtxprod02.stni.seagate.com/~gtx/wafer/proc_LIV/data/byProdLot/QC/QCI2K/LIV_53_QCI2K_DNS-LIVTKCOD_LIVTK-DNS_STX_processed.csv', 'https://sprgtxprod02.stni.seagate.com/~gtx/wafer/proc_LIV/data/byProdLot/QC/QCI2T/LIV_54_QCI2T_DNS-LIVTKCOD_LIVTK-DNS_STX_processed.csv', 'https://sprgtxprod02.stni.seagate.com/~gtx/wafer/proc_LIV/data/byProdLot/QC/QCI2V/LIV_54_QCI2V_DNS-LIVTKCOD_LIVTK-DNS_STX_processed.csv', 'https://sprgtxprod02.stni.seagate.com/~gtx/wafer/proc_LIV/data/byProdLot/QC/QCI2X/LIV_53_QCI2X_DNS-LIVTKCOD_LIVTK-DNS_STX_processed.csv', 'https://sprgtxprod02.stni.seagate.com/~gtx/wafer/proc_LIV/data/byProdLot/QC/QCI3E/LIV_54_QCI3E_DNS-LIVTKCOD_LIVTK-DNS_STX_processed.csv', 'https://sprgtxprod02.stni.seagate.com/~gtx/wafer/proc_LIV/data/byProdL

# Transform Data to Desired Raw Sweep Format

- selects required columns
- transposes
- stacks data in tall format
- adds in device coords from decoder file
- loops for every csv file chosen, and stores raw_sweep dataframes

In [37]:
def transform_raw_liv_file(file_url, decoder_file_path, machine_code, wafer_id):
    start_time = time.time()

    # Step 1: Read the CSV file from the URL, skipping the first 19 rows
    print("Step 1: Reading the CSV file...")
    df = pd.read_csv(
        file_url,
        skiprows=19,
    )
    print(f"Step 1 completed in {time.time() - start_time:.2f} seconds")

    # Step 3: Get column names and subset the data frame with selected columns
    print("Step 3: Subsetting the data frame...")
    col_names = df.columns
    selected_cols = [col for col in col_names if "Vf" in col or "PD" in col]
    df_subset = df[selected_cols]
    cols_to_delete = [col for col in df_subset.columns if "Vf@" in col or "PD@" in col]
    df_subset.drop(columns=cols_to_delete, inplace=True)
    print(f"Step 3 completed in {time.time() - start_time:.2f} seconds")

    # Step 4: Transpose the data frame and reset index
    print("Step 4: Transposing the data frame...")
    df_transposed = df_subset.transpose()
    df_transposed.reset_index(inplace=True)
    new_columns = ["Label"] + list(range(1, len(df_transposed.columns)))
    df_transposed.columns = new_columns
    df_transposed.loc[-1] = new_columns  # Add the new row at the top
    df_transposed.index = df_transposed.index + 1  # Shift the index
    df_transposed = df_transposed.sort_index()  # Sort by index to place the new row at the top
    print(f"Step 4 completed in {time.time() - start_time:.2f} seconds")

    # Step 5: Split transposed table into Vf and PD data tables
    print("Step 5: Splitting the transposed table...")
    df_vf = df_transposed[df_transposed["Label"].str.contains("Vf")]
    df_pd = df_transposed[df_transposed["Label"].str.contains("PD")]
    df_vf.drop(columns=["Label"], inplace=True)
    df_pd.drop(columns=["Label"], inplace=True)
    print(f"Step 5 completed in {time.time() - start_time:.2f} seconds")

    # Step 6: Learn data dimensions
    print("Step 6: Learning data dimensions...")
    n_meas = df_vf.shape[0]
    print(f"Number of Current Measurements per Device: {n_meas}")
    n_devices = df_vf.shape[1]
    print(f"Number of Devices: {n_devices}")
    print(f"Step 6 completed in {time.time() - start_time:.2f} seconds")

    # Step 7: Concatenate all Voltage columns into one
    print("Step 7: Concatenating Voltage columns...")
    df_concat_vf = pd.concat([df_vf[col] for col in df_vf.columns], ignore_index=True).to_frame(name="Vf")
    df_concat_vf["TOUCHDOWN"] = [i // n_meas + 1 for i in range(n_meas * n_devices)]
    print(f"Step 7 completed in {time.time() - start_time:.2f} seconds")

    # Step 8: Concatenate all PD columns into one
    print("Step 8: Concatenating PD columns...")
    df_concat_pd = pd.concat([df_pd[col] for col in df_pd.columns], ignore_index=True).to_frame(name="PD")
    print(f"Step 8 completed in {time.time() - start_time:.2f} seconds")

    # Step 9: Cartesian join of Vf and PD data tables
    print("Step 9: Performing Cartesian join...")
    df_raw_sweeps = pd.concat([df_concat_vf, df_concat_pd], axis=1)
    print(f"Step 9 completed in {time.time() - start_time:.2f} seconds")

    # Step 10: Add device coordinates from original RAW file
    print("Step 10: Adding device coordinates...")
    if "TOUCHDOWN" in df.columns and "STX_WAFER_X_UM" in df.columns and "STX_WAFER_Y_UM" in df.columns:
        df_raw_sweeps = df_raw_sweeps.merge(
            df[["TOUCHDOWN", "STX_WAFER_X_UM", "STX_WAFER_Y_UM"]], on="TOUCHDOWN", how="left"
        )
    else:
        print("Required columns for merging device coordinates are missing in the original RAW file.")
    print(f"Step 10 completed in {time.time() - start_time:.2f} seconds")

    # Step 11: Merge with decoder file to get TE_LABEL etc.
    print("Step 11: Merging with decoder file...")
    if decoder_file_path.exists():
        df_decoder = pd.read_csv(decoder_file_path)
        if "YMIN" in df_decoder.columns and "XMIN" in df_decoder.columns:
            df_raw_sweeps = df_raw_sweeps.merge(
                df_decoder[["YMIN", "XMIN", "TE_LABEL", "TYPE"]],
                left_on=["STX_WAFER_Y_UM", "STX_WAFER_X_UM"],
                right_on=["YMIN", "XMIN"],
                how="left",
            ).drop(columns=["YMIN", "XMIN"])
        else:
            print("Required columns for merging decoder data are missing in the decoder file.")
    else:
        print(f"Decoder file not found at {decoder_file_path}")
    print(f"Step 11 completed in {time.time() - start_time:.2f} seconds")

    # Step 12: Rename the columns
    print("Step 12: Renaming columns...")
    df_raw_sweeps.rename(columns={"STX_WAFER_X_UM": "X_UM", "STX_WAFER_Y_UM": "Y_UM"}, inplace=True)
    print(f"Step 12 completed in {time.time() - start_time:.2f} seconds")

    # Step 13: Add current column as a repeating sequence of length n_meas
    print("Step 13: Adding current column...")
    df_raw_sweeps["LDI_mA"] = [i % n_meas + 1 for i in range(len(df_raw_sweeps))]
    print(f"Step 13 completed in {time.time() - start_time:.2f} seconds")

    # Step 14: Add a column for WAFER_ID with the wafer_id value repeated for every row
    print("Step 14: Adding WAFER_ID column...")
    df_raw_sweeps.insert(0, "WAFER_ID", wafer_id)
    print(f"Step 14 completed in {time.time() - start_time:.2f} seconds")

    # Step 15: Add a column for machine_code with the machine_code value repeated for every row
    print("Step 15: Adding MACHINE_CODE column...")
    df_raw_sweeps.insert(0, "MACH", machine_code)
    print(f"Step 15 completed in {time.time() - start_time:.2f} seconds")

    total_time = time.time() - start_time
    print(f"Total time taken: {total_time:.2f} seconds")

    return df_raw_sweeps


def transform_raw_liv_file_first_n_rows(file_url, decoder_file_path, machine_code, wafer_id, n):
    start_time_overall = time.time()

    # Step 2: Read the data rows, skipping the header rows
    print("Step 2: Reading the data rows, skipping the header rows...")
    start_time = time.time()
    df = pd.read_csv(file_url, skiprows=19, nrows=n)
    print(f"Step 2 completed in {time.time() - start_time:.2f} seconds")

    # Step 3: Get column names and subset the data frame with selected columns
    print("Step 3: Subsetting the data frame...")
    start_time = time.time()
    col_names = df.columns
    selected_cols = [col for col in col_names if "Vf" in col or "PD" in col]
    df_subset = df[selected_cols]
    cols_to_delete = [col for col in df_subset.columns if "Vf@" in col or "PD@" in col]
    df_subset.drop(columns=cols_to_delete, inplace=True)
    print(f"Step 3 completed in {time.time() - start_time:.2f} seconds")

    # Step 4: Transpose the data frame and reset index
    print("Step 4: Transposing the data frame...")
    start_time = time.time()
    df_transposed = df_subset.transpose()
    df_transposed.reset_index(inplace=True)
    new_columns = ["Label"] + list(range(1, len(df_transposed.columns)))
    df_transposed.columns = new_columns
    df_transposed.loc[-1] = new_columns  # Add the new row at the top
    df_transposed.index = df_transposed.index + 1  # Shift the index
    df_transposed = df_transposed.sort_index()  # Sort by index to place the new row at the top
    print(f"Step 4 completed in {time.time() - start_time:.2f} seconds")

    # Step 5: Split transposed table into Vf and PD data tables
    print("Step 5: Splitting the transposed table...")
    start_time = time.time()
    df_vf = df_transposed[df_transposed["Label"].str.contains("Vf")]
    df_pd = df_transposed[df_transposed["Label"].str.contains("PD")]
    df_vf.drop(columns=["Label"], inplace=True)
    df_pd.drop(columns=["Label"], inplace=True)
    print(f"Step 5 completed in {time.time() - start_time:.2f} seconds")

    # Step 6: Learn data dimensions
    print("Step 6: Learning data dimensions...")
    start_time = time.time()
    n_meas = df_vf.shape[0]
    print(f"Number of Current Measurements per Device: {n_meas}")
    n_devices = df_vf.shape[1]
    print(f"Number of Devices: {n_devices}")
    print(f"Step 6 completed in {time.time() - start_time:.2f} seconds")

    # Step 7: Concatenate all Voltage columns into one
    print("Step 7: Concatenating Voltage columns...")
    start_time = time.time()
    df_concat_vf = pd.concat([df_vf[col] for col in df_vf.columns], ignore_index=True).to_frame(name="Vf")
    df_concat_vf["TOUCHDOWN"] = [i // n_meas + 1 for i in range(n_meas * n_devices)]
    print(f"Step 7 completed in {time.time() - start_time:.2f} seconds")

    # Step 8: Concatenate all PD columns into one
    print("Step 8: Concatenating PD columns...")
    start_time = time.time()
    df_concat_pd = pd.concat([df_pd[col] for col in df_pd.columns], ignore_index=True).to_frame(name="PD")
    print(f"Step 8 completed in {time.time() - start_time:.2f} seconds")

    # Step 9: Cartesian join of Vf and PD data tables
    print("Step 9: Performing Cartesian join...")
    start_time = time.time()
    df_raw_sweeps = pd.concat([df_concat_vf, df_concat_pd], axis=1)
    print(f"Step 9 completed in {time.time() - start_time:.2f} seconds")

    # Step 10: Add device coordinates from original RAW file
    print("Step 10: Adding device coordinates...")
    start_time = time.time()
    if "TOUCHDOWN" in df.columns and "STX_WAFER_X_UM" in df.columns and "STX_WAFER_Y_UM" in df.columns:
        df_raw_sweeps = df_raw_sweeps.merge(
            df[["TOUCHDOWN", "STX_WAFER_X_UM", "STX_WAFER_Y_UM"]], on="TOUCHDOWN", how="left"
        )
    else:
        print("Required columns for merging device coordinates are missing in the original RAW file.")
    print(f"Step 10 completed in {time.time() - start_time:.2f} seconds")

    # Step 11: Merge with decoder file to get TE_LABEL etc.
    print("Step 11: Merging with decoder file...")
    start_time = time.time()
    if decoder_file_path.exists():
        df_decoder = pd.read_csv(decoder_file_path)
        if "YMIN" in df_decoder.columns and "XMIN" in df_decoder.columns:
            df_raw_sweeps = df_raw_sweeps.merge(
                df_decoder[["YMIN", "XMIN", "TE_LABEL", "TYPE"]],
                left_on=["STX_WAFER_Y_UM", "STX_WAFER_X_UM"],
                right_on=["YMIN", "XMIN"],
                how="left",
            ).drop(columns=["YMIN", "XMIN"])
        else:
            print("Required columns for merging decoder data are missing in the decoder file.")
    else:
        print(f"Decoder file not found at {decoder_file_path}")
    print(f"Step 11 completed in {time.time() - start_time:.2f} seconds")

    # Step 12: Rename the columns
    print("Step 12: Renaming columns...")
    start_time = time.time()
    df_raw_sweeps.rename(columns={"STX_WAFER_X_UM": "X_UM", "STX_WAFER_Y_UM": "Y_UM"}, inplace=True)
    print(f"Step 12 completed in {time.time() - start_time:.2f} seconds")

    # Step 13: Add current column as a repeating sequence of length n_meas
    print("Step 13: Adding current column...")
    start_time = time.time()
    df_raw_sweeps["LDI_mA"] = [i % n_meas + 1 for i in range(len(df_raw_sweeps))]
    print(f"Step 13 completed in {time.time() - start_time:.2f} seconds")

    # Step 14: Add a column for WAFER_ID with the wafer_id value repeated for every row
    print("Step 14: Adding WAFER_ID column...")
    start_time = time.time()
    df_raw_sweeps.insert(0, "WAFER_ID", wafer_id)
    print(f"Step 14 completed in {time.time() - start_time:.2f} seconds")

    # Step 15: Add a column for machine_code with the machine_code value repeated for every row
    print("Step 15: Adding MACHINE_CODE column...")
    df_raw_sweeps.insert(0, "MACH", machine_code)
    print(f"Step 15 completed in {time.time() - start_time:.2f} seconds")

    total_time = time.time() - start_time_overall
    print(f"Total time taken: {total_time:.2f} seconds")

    sampling_rate = 1

    return (df_raw_sweeps, n_meas, n_devices, sampling_rate)


def transform_raw_liv_file_every_nth_laser(file_url, decoder_file_path, machine_code, wafer_id, n=10000):
    start_time_overall = time.time()

    # Step 2: Read the data rows, skipping the header rows
    print("Step 2: Reading the data rows, skipping the header rows...")
    start_time = time.time()
    df = pd.read_csv(file_url, skiprows=19)
    print(f"Step 2 completed in {time.time() - start_time:.2f} seconds")

    # Step 3: Filter every nth laser based on TOUCHDOWN
    print(f"Step 3: Filtering every {n}th laser...")
    start_time = time.time()
    df_filtered = df[df["TOUCHDOWN"] % n == 0]
    print(f"Step 3 completed in {time.time() - start_time:.2f} seconds")

    # Step 4: Get column names and subset the data frame with selected columns
    print("Step 4: Subsetting the data frame...")
    start_time = time.time()
    col_names = df_filtered.columns
    selected_cols = [col for col in col_names if "Vf" in col or "PD" in col]
    df_subset = df_filtered[selected_cols]
    cols_to_delete = [col for col in df_subset.columns if "Vf@" in col or "PD@" in col]
    df_subset.drop(columns=cols_to_delete, inplace=True)
    print(f"Step 4 completed in {time.time() - start_time:.2f} seconds")

    # Step 5: Transpose the data frame and reset index
    print("Step 5: Transposing the data frame...")
    start_time = time.time()
    df_transposed = df_subset.transpose()
    df_transposed.reset_index(inplace=True)
    new_columns = ["Label"] + list(range(1, len(df_transposed.columns)))
    df_transposed.columns = new_columns
    df_transposed.loc[-1] = new_columns  # Add the new row at the top
    df_transposed.index = df_transposed.index + 1  # Shift the index
    df_transposed = df_transposed.sort_index()  # Sort by index to place the new row at the top
    print(f"Step 5 completed in {time.time() - start_time:.2f} seconds")

    # Step 6: Split transposed table into Vf and PD data tables
    print("Step 6: Splitting the transposed table...")
    start_time = time.time()
    df_vf = df_transposed[df_transposed["Label"].str.contains("Vf")]
    df_pd = df_transposed[df_transposed["Label"].str.contains("PD")]
    df_vf.drop(columns=["Label"], inplace=True)
    df_pd.drop(columns=["Label"], inplace=True)
    print(f"Step 6 completed in {time.time() - start_time:.2f} seconds")

    # Step 7: Learn data dimensions
    print("Step 7: Learning data dimensions...")
    start_time = time.time()
    n_meas = df_vf.shape[0]
    print(f"Number of Current Measurements per Device: {n_meas}")
    n_devices = df_vf.shape[1]
    print(f"Number of Devices: {n_devices}")
    print(f"Step 7 completed in {time.time() - start_time:.2f} seconds")

    # Step 8: Concatenate all Voltage columns into one
    print("Step 8: Concatenating Voltage columns...")
    start_time = time.time()
    df_concat_vf = pd.concat([df_vf[col] for col in df_vf.columns], ignore_index=True).to_frame(name="Vf")
    # Instead of generating a new TOUCHDOWN, reuse the one from df_filtered
    df_concat_vf["TOUCHDOWN"] = df_filtered["TOUCHDOWN"].repeat(n_meas).values
    print(f"Step 8 completed in {time.time() - start_time:.2f} seconds")

    # Step 9: Concatenate all PD columns into one
    print("Step 9: Concatenating PD columns...")
    start_time = time.time()
    df_concat_pd = pd.concat([df_pd[col] for col in df_pd.columns], ignore_index=True).to_frame(name="PD")
    print(f"Step 9 completed in {time.time() - start_time:.2f} seconds")

    # Step 10: Cartesian join of Vf and PD data tables
    print("Step 10: Performing Cartesian join...")
    start_time = time.time()
    df_raw_sweeps = pd.concat([df_concat_vf, df_concat_pd], axis=1)
    print(f"Step 10 completed in {time.time() - start_time:.2f} seconds")

    # Step 11: Add device coordinates from original RAW file
    print("Step 11: Adding device coordinates...")
    start_time = time.time()
    if "TOUCHDOWN" in df.columns and "STX_WAFER_X_UM" in df.columns and "STX_WAFER_Y_UM" in df.columns:
        df_raw_sweeps = df_raw_sweeps.merge(
            df[["TOUCHDOWN", "STX_WAFER_X_UM", "STX_WAFER_Y_UM"]], on="TOUCHDOWN", how="left"
        )
    else:
        print("Required columns for merging device coordinates are missing in the original RAW file.")
    print(f"Step 11 completed in {time.time() - start_time:.2f} seconds")

    # Step 12: Merge with decoder file to get TE_LABEL etc.
    print("Step 12: Merging with decoder file...")
    start_time = time.time()
    if decoder_file_path.exists():
        df_decoder = pd.read_csv(decoder_file_path)
        if "YMIN" in df_decoder.columns and "XMIN" in df_decoder.columns:
            df_raw_sweeps = df_raw_sweeps.merge(
                df_decoder[["YMIN", "XMIN", "TE_LABEL", "TYPE"]],
                left_on=["STX_WAFER_Y_UM", "STX_WAFER_X_UM"],
                right_on=["YMIN", "XMIN"],
                how="left",
            ).drop(columns=["YMIN", "XMIN"])
        else:
            print("Required columns for merging decoder data are missing in the decoder file.")
    else:
        print(f"Decoder file not found at {decoder_file_path}")
    print(f"Step 12 completed in {time.time() - start_time:.2f} seconds")

    # Step 13: Rename the columns
    print("Step 13: Renaming columns...")
    start_time = time.time()
    df_raw_sweeps.rename(columns={"STX_WAFER_X_UM": "X_UM", "STX_WAFER_Y_UM": "Y_UM"}, inplace=True)
    print(f"Step 13 completed in {time.time() - start_time:.2f} seconds")

    # Step 14: Add current column as a repeating sequence of length n_meas
    print("Step 14: Adding current column...")
    start_time = time.time()
    df_raw_sweeps["LDI_mA"] = [i % n_meas + 1 for i in range(len(df_raw_sweeps))]
    print(f"Step 14 completed in {time.time() - start_time:.2f} seconds")

    # Step 15: Add a column for WAFER_ID with the wafer_id value repeated for every row
    print("Step 15: Adding WAFER_ID column...")
    start_time = time.time()
    df_raw_sweeps.insert(0, "WAFER_ID", wafer_id)
    print(f"Step 15 completed in {time.time() - start_time:.2f} seconds")

    # Step 16: Add a column for machine_code with the machine_code value repeated for every row
    print("Step 16: Adding MACHINE_CODE column...")
    df_raw_sweeps.insert(0, "MACH", machine_code)
    print(f"Step 16 completed in {time.time() - start_time:.2f} seconds")

    total_time = time.time() - start_time_overall
    print(f"Total time taken: {total_time:.2f} seconds")

    sampling_rate = n

    return (df_raw_sweeps, n_meas, n_devices, sampling_rate)


raw_sweeps_tables = []
device_numbers = []
sampling_rates = []

warnings.filterwarnings("ignore")

ROW_NUMBER = 1000

# # CAL
# for file_url, machine_code in zip(file_urls, machine_list):
#     df_raw_sweeps, n_meas, n_devices = transform_raw_liv_file_first_n_rows(file_url, DECODER_FILE_PATH, machine_code, ROW_NUMBER)
#     if df_raw_sweeps["TE_LABEL"].isna().any():
#         raise ValueError("ERROR: Decoder Matching Failed! Perhaps the wrong decoder file was used, no matching X or Y coords found")
#     raw_sweeps_tables.append(df_raw_sweeps)
#     device_numbers.append(n_devices)

# # Display the first 10 rows of the raw_sweeps table
# print(raw_sweeps_tables[0].head(10))
# print(device_numbers[0])

# DEBUG: CALLING SAMPLED DATA ACROSS MULTIPLE WAFERS
SAMPLE_ROWS = 10000  # You can change this value to any other number as needed
for file_url, machine_code, wafer_code in zip(file_urls, machine_list, wafer_codes):
    df_raw_sweeps, n_meas, n_devices, sampling_rate = transform_raw_liv_file_every_nth_laser(
        file_url, DECODER_FILE_PATH, machine_code, wafer_code, n=SAMPLE_ROWS
    )
    if df_raw_sweeps["TE_LABEL"].isna().any():
        raise ValueError(
            "ERROR: Decoder Matching Failed! Perhaps the wrong decoder file was used, no matching X or Y coords found"
        )
    raw_sweeps_tables.append(df_raw_sweeps)
    device_numbers.append(n_devices)
    sampling_rates.append(sampling_rate)

# # Concatenate all dataframes together
# df_combined = pd.concat(raw_sweeps_tables, ignore_index=True)
# # easy measure to make rest of code call:
# raw_sweeps_tables = [df_combined]

# # Display the first 10 rows of the combined dataframe
# print(df_combined.head(10))
# print(raw_sweeps_tables[0].head(10))

Step 2: Reading the data rows, skipping the header rows...
Step 2 completed in 10.65 seconds
Step 3: Filtering every 10000th laser...
Step 3 completed in 0.01 seconds
Step 4: Subsetting the data frame...
Step 4 completed in 0.01 seconds
Step 5: Transposing the data frame...
Step 5 completed in 0.01 seconds
Step 6: Splitting the transposed table...
Step 6 completed in 0.01 seconds
Step 7: Learning data dimensions...
Number of Current Measurements per Device: 63
Number of Devices: 34
Step 7 completed in 0.00 seconds
Step 8: Concatenating Voltage columns...
Step 8 completed in 0.00 seconds
Step 9: Concatenating PD columns...
Step 9 completed in 0.00 seconds
Step 10: Performing Cartesian join...
Step 10 completed in 0.00 seconds
Step 11: Adding device coordinates...
Step 11 completed in 0.01 seconds
Step 12: Merging with decoder file...
Step 12 completed in 1.39 seconds
Step 13: Renaming columns...
Step 13 completed in 0.00 seconds
Step 14: Adding current column...
Step 14 completed in 0.0

# General Processing Functions

Some predefined functions that can be used outside of ITH

In [38]:
def basic_sweep_analysis(df):
    """
    Compute first and second order differentials for voltage (Vf) and photodiode signal (PD)
    while ensuring calculations remain per device.
    Additionally, compute min and max PD per touchdown and clone max PD across the sweep.
    """
    df["dV/dI"] = df.groupby("TOUCHDOWN")["Vf"].diff()
    df["dP/dI"] = df.groupby("TOUCHDOWN")["PD"].diff()
    df["d2V/dI2"] = df.groupby("TOUCHDOWN")["dV/dI"].diff()
    df["d2P/dI2"] = df.groupby("TOUCHDOWN")["dP/dI"].diff()

    df["MAX_PD"] = df.groupby("TOUCHDOWN")["PD"].transform("max")
    df["MIN_PD"] = df.groupby("TOUCHDOWN")["PD"].transform("min")
    return df


def flag_no_laser_touchdowns(df_raw_sweeps):
    """
    Adds a "FLAG" column to df_raw_sweeps, labeling touchdowns as "NO LASER"
    if the max PD value for that touchdown is below 1.
    """
    df_raw_sweeps["FLAG"] = np.nan
    no_laser_touchdowns = df_raw_sweeps.groupby("TOUCHDOWN")["PD"].max()
    no_laser_touchdowns = no_laser_touchdowns[no_laser_touchdowns < 1].index
    df_raw_sweeps.loc[df_raw_sweeps["TOUCHDOWN"].isin(no_laser_touchdowns), "FLAG"] = "NO LASER"
    return df_raw_sweeps


# Linear model for line fitting
def linear_model(x, slope, intercept):
    return slope * x + intercept


# Least Absolute Residuals fitting function using L1 norm
def least_absolute_residuals_fit(x, y, model, initial_guess, bounds):
    def objective(params):
        return np.sum(np.abs(model(x, *params) - y))

    result = minimize(objective, initial_guess, bounds=bounds, method="L-BFGS-B", options={"maxiter": 1000})

    residuals = np.abs(model(x, *result.x) - y)
    mean_abs_error = np.mean(residuals)

    return result.x, mean_abs_error

# I_th Data Processing

- Has a function that finds ith given an input intensity and current array
    - **NB:** find_ith_value_labview is trying to mimic labview more closely, while the find_ith_value is a custom function that finds ith value using a similar but not identical mechanisim
- Calls the find_ith_value function on the multiple lasers in the raw sweep file with evaluate_ITH_on_rawsweep
- Then generates a device level summary using generate_ITH_device_summary_table
- calls this code and exports in the main for loop.

In [42]:
def find_ith_value(intensity, current, min_slope_fitpnt=1, max_slope_fitpnt=10, window_length=5, polyorder=2):
    try:
        # Sort data by current to ensure proper processing
        sorted_indices = np.argsort(current)
        current, intensity = current[sorted_indices], intensity[sorted_indices]

        # Normalize intensity using provided max and min PD values
        min_intensity, max_intensity = np.min(intensity), np.max(intensity)
        intensity_norm = (intensity - min_intensity) / (max_intensity - min_intensity)

        # Normalize intensity using min-max scaling
        min_intensity = np.min(intensity)
        max_intensity = np.max(intensity)
        intensity_norm = (intensity - min_intensity) / (max_intensity - min_intensity)

        # Apply Savitzky-Golay smoothing to normalized intensity
        smoothed_intensity_norm = savgol_filter(intensity_norm, window_length=window_length, polyorder=polyorder)

        # Compute differentials on normalized & smoothed data
        smoothed_dI_dC_norm = np.gradient(smoothed_intensity_norm, current)
        smoothed_d2I_dC2_norm = np.gradient(smoothed_dI_dC_norm, current)

        # Filter the data to only consider LDI between 2 and 30 mA for Gaussian fitting
        mask = (current >= 2) & (current <= 30)
        if not np.any(mask):
            print("Warning: No data points in the 2-30 mA range.")
            return None, None

        current_masked = current[mask]
        smoothed_d2I_dC2_norm_masked = smoothed_d2I_dC2_norm[mask]

        # Fit Gaussian to the smoothed second differential
        p0 = [np.max(smoothed_d2I_dC2_norm_masked), np.median(current_masked), np.std(current_masked)]
        popt, pcov = curve_fit(gaussian, current_masked, smoothed_d2I_dC2_norm_masked, p0=p0)
        median_x = popt[1]  # Extract median x from Gaussian fit

        # Handle fitting errors
        if not (2 <= median_x <= 30):
            print("Warning: Gaussian fit unable to find reasonable split point. ")
            median_x = 12.5
        elif np.any(np.diag(pcov) > 0.1):  # Adjust threshold as needed
            print("Warning: Abnormal LI curve detected due to high error in Gaussian fit.")
            return None, None

        # Split data at median_x
        left_side = current[current <= median_x]
        right_side_mask = (current > median_x + min_slope_fitpnt) & (
            current < median_x + max_slope_fitpnt
        )  # Only Fit Right Slope after a few current pnts after ITH, and up to a certain current maximum above ITH.
        right_side = current[right_side_mask]
        intensity_norm_left = intensity_norm[current <= median_x]
        intensity_norm_right = intensity_norm[right_side_mask]

        # Check if either side is empty
        if len(left_side) == 0 or len(right_side) == 0:
            print("Warning: No reasonable I_th detected within bounds.")
            return None, None

        # Fit linear regression to both segments
        slope_left, intercept_left, _, _, _ = linregress(left_side, intensity_norm_left)
        slope_right, intercept_right, _, _, _ = linregress(right_side, intensity_norm_right)

        # Compute intersection point
        intersection_x = (intercept_right - intercept_left) / (slope_left - slope_right)
        ith_value = intersection_x  # No rounding

        # Final evaluation check for ITH value
        if not (2 <= ith_value <= 30):
            print("Warning: Computed ITH value outside valid bounds (2-30 mA). Returning None.")
            return None, slope_right

        return ith_value, slope_right
    except Exception as e:
        print(f"Error: {e}")
        return None, None


# Gaussian model for fitting
def gaussian(x, a, x0, sigma):
    return a * np.exp(-((x - x0) ** 2) / (2 * sigma**2))


def find_ith_value_labview(intensity, current):
    # try:
    # 1) Trim data to only include values >2 and <=35 mA
    mask_trimmed = (current > 2) & (current <= 35)
    if not np.any(mask_trimmed):
        print("Warning: No data points between 2 and 35 mA.")
        return 0, 0
    current = current[mask_trimmed]
    intensity = intensity[mask_trimmed]

    # 2) Interpolate to double resolution (spacing of 0.5 mA)
    current_interp = np.arange(np.min(current), np.max(current) + 0.1, 0.5)
    intensity_interp = np.interp(current_interp, current, intensity)

    current = current_interp
    intensity = intensity_interp

    # 3) Normalize intensity using min-max scaling (first operation)
    min_intensity = np.min(intensity)
    max_intensity = np.max(intensity)
    intensity_norm = (intensity - min_intensity) / (max_intensity - min_intensity)

    # 4) Sort data by current to ensure proper processing
    sorted_indices = np.argsort(current)
    current = current[sorted_indices]
    intensity_norm = intensity_norm[sorted_indices]

    # 5) Apply Chebyshev high-pass filter (order 2, ripple 0.1 dB, bandpass 0.15–0.45)
    b, a = cheby1(N=2, rp=0.1, Wn=[0.15, 0.45], btype="bandpass", fs=1)
    filtered_intensity = filtfilt(b, a, intensity_norm)

    # 5a) Initial linear fit via least absolute residuals on filtered data using QuantReg
    X = sm.add_constant(current)  # Adds intercept term
    model = sm.QuantReg(filtered_intensity, X)
    res = model.fit(q=0.5)
    slope_left = res.params[1]
    intercept_left = res.params[0]
    initial_abs_residual_total = np.mean(np.abs(filtered_intensity - res.predict(X)))

    print(initial_abs_residual_total)
    if initial_abs_residual_total > 0.001:
        print("Warning: Initial L1 fit residual too high.")
        return 0, 0

    # 6) First Savitzky-Golay smoothing (5,1)
    smoothed_intensity = savgol_filter(filtered_intensity, window_length=5, polyorder=1)

    # 7) Second Savitzky-Golay smoothing before differential (3,2)
    smoothed_intensity = savgol_filter(smoothed_intensity, window_length=3, polyorder=2)

    # 8) Compute first derivative (renamed to dL_dI)
    dL_dI = np.gradient(smoothed_intensity, current)

    # 9) Smooth first derivative (6,2)
    smoothed_dL_dI = savgol_filter(dL_dI, window_length=6, polyorder=2)

    # 10) Compute second derivative (renamed to d2L_dI2)
    d2L_dI2 = np.gradient(smoothed_dL_dI, current)

    # 11) Smooth second derivative (6,2)
    smoothed_d2L_dI2 = savgol_filter(d2L_dI2, window_length=6, polyorder=2)

    # 11a) Set negative second derivative values to zero (LabVIEW-like behavior)
    smoothed_d2L_dI2[smoothed_d2L_dI2 < 0] = 0

    # 12) Normalize second derivative and add 0.01
    max_d2L_dI2 = np.max(smoothed_d2L_dI2)
    if max_d2L_dI2 == 0:
        print("Warning: Second derivative all zero after zeroing negatives.")
        return 0, 0
    d2L_dI2_ready = (smoothed_d2L_dI2 / max_d2L_dI2) + 0.01

    # 13) Least Absolute Residuals fitting with initial conditions and bounds
    initial_guess = [1, 11, np.std(current)]
    bounds = [
        (0, np.inf),  # Bounds for parameter a
        (7, np.max(current) - 3),  # Bounds for parameter x0
        (0, 3),  # Bounds for parameter sigma
    ]
    popt, _ = least_absolute_residuals_fit(current, d2L_dI2_ready, gaussian, initial_guess, bounds)
    median_x = popt[1]

    # 14) Validate split point
    if not (2 <= median_x <= 35):
        print("Warning: Gaussian fit split point out of bounds. Using default 12.5 mA.")
        median_x = 12.5

    # 15) Linear fitting on left segment (with gradient bound)
    left_mask = current <= median_x
    if not np.any(left_mask):
        print("Warning: No data points on the left segment for fitting.")
        return 0, 0

    current_left = current[left_mask]
    intensity_left = intensity_norm[left_mask]

    popt_left, _ = curve_fit(linear_model, current_left, intensity_left, bounds=([0.001, -np.inf], [np.inf, np.inf]))
    slope_left, intercept_left = popt_left

    # 16) Linear fitting on right segment (no bounds)
    right_mask = current > median_x
    if not np.any(right_mask):
        print("Warning: No data points on the right segment for fitting.")
        return 0, 0

    current_right = current[right_mask]
    intensity_right = intensity_norm[right_mask]

    if len(current_right) < 10:
        print("Warning: Fewer than 10 data points for stimulated emission fit.")
        return 0, 0

    popt_right, _ = curve_fit(linear_model, current_right, intensity_right)
    slope_efficiency, intercept_right = popt_right

    fitted_right = linear_model(current_right, *popt_right)
    mse_right = np.mean((intensity_right - fitted_right) ** 2)

    print(mse_right)
    if mse_right > 0.001:
        print("Warning: High MSE in stimulated emission fit.")
        return 0, 0

    # 17) Compute intersection (I_th)
    ith_value = (intercept_right - intercept_left) / (slope_left - slope_efficiency)
    if not (2 <= ith_value <= 35):
        print("Warning: Computed I_th outside bounds. Returning None.")
        return 0, 0

    return ith_value, slope_efficiency


def evaluate_ITH_on_rawsweep(df_raw_sweeps, touchdown_number, sampling_rate=1):
    df_raw_sweeps["ITH"] = np.nan
    for touchdown in range(
        sampling_rate, touchdown_number * sampling_rate + 1, sampling_rate
    ):  # accounts for selecting every nth touchdown if sampling.
        specific_data = df_raw_sweeps[df_raw_sweeps["TOUCHDOWN"] == touchdown]
        if not specific_data.empty and "NO LASER" not in specific_data["FLAG"].values:
            ith_value, slope_efficiency = find_ith_value_labview(
                specific_data["PD"].values,
                specific_data["LDI_mA"].values,
            )
            if ith_value != 0:
                df_raw_sweeps.loc[df_raw_sweeps["TOUCHDOWN"] == touchdown, "ITH"] = ith_value
    return df_raw_sweeps


def generate_ITH_device_summary_table(raw_sweeps):

    # Create COD Summary table with POT_FAILMODE
    device_summary = (
        (
            raw_sweeps.groupby("TE_LABEL").agg(
                {
                    "WAFER_ID": "first",
                    "MACH": "first",
                    "TOUCHDOWN": "first",
                    "ITH": "first",
                    "TYPE": "first",
                    "X_UM": "first",
                    "Y_UM": "first",
                    "FLAG": "first",
                }
            )
        )
        .reset_index()
        .sort_values("TOUCHDOWN")
    )

    return device_summary


annotated_sweeps_tables = []


device_summary_tables = []


wafer_summary_tables = []


for df_raw_sweeps, num_devices, sampling_rate, wafer_code in zip(
    raw_sweeps_tables, device_numbers, sampling_rates, wafer_codes
):
    # Apply the NO LASER flag function
    df_raw_sweeps = flag_no_laser_touchdowns(df_raw_sweeps)
    # print(f"\nflagged:\n {df_raw_sweeps}")

    # Run ITH evaluations
    df_raw_sweeps = evaluate_ITH_on_rawsweep(df_raw_sweeps, num_devices, sampling_rate)
    # print(f"\nannotated:\n {df_raw_sweeps}")
    annotated_sweeps_tables.append(df_raw_sweeps)
    # df_raw_sweeps.to_csv(EXPORTS_FILEPATH / f"{ANALYSIS_RUN_NAME}_{wafer_code}_raw_sweeps.csv", index=False)

    # Device Summary
    device_summary = generate_ITH_device_summary_table(df_raw_sweeps)
    # print(f"\ndevice summary:\n {device_summary}")
    device_summary_tables.append(device_summary)
    device_summary.to_csv(EXPORTS_FILEPATH / f"{ANALYSIS_RUN_NAME}_{wafer_code}_device_summary.csv", index=False)

# print(annotated_sweeps_tables[0].head(1000))

# print(device_summary_tables[0].head(1000))
# print(len(device_summary_tables))

# Concatenate all dataframes together
df_combined = pd.concat(device_summary_tables, ignore_index=True)
df_combined.to_csv(EXPORTS_FILEPATH / f"{ANALYSIS_RUN_NAME}_device_combined_summary.csv", index=False)

# Display the first 10 rows of the combined dataframe
# print(df_combined.head(10))

0.0008731437276483298
2.2574539834284877e-06
0.0009145692394722243
2.86357953759205e-05
0.0008810003739885495
8.372825356260545e-06
0.0008534878434638233
5.724036031654619e-06
0.0008087252382313131
5.759119542506788e-06
0.0008277020725579558
4.259888201568181e-06
0.0008085335683683614
3.846608894639682e-06
0.000868165341482489
3.7113987947258524e-06
0.0008833977455428158
5.424060626892032e-06
0.0008779046642819822
4.499789639785524e-06
0.0008860664053302552
4.503549483407212e-06
0.0008640016799851126
4.3114665451855275e-06
0.0008321530536691177
4.154779694887436e-06
0.0008108947779085842
3.6732867138594526e-06
0.0008293030099635619
6.354452643144125e-06
0.0008066921434381432
3.907550742131846e-06
0.0008261367976702505
6.246465018449108e-06
0.0008646782436562801
4.690851176250807e-06
0.0007942016168629002
1.424599925824072e-06
0.0008578473412061647
3.4707836390993637e-06
0.0009018511108209389
5.3486316569471586e-06
0.0008376962273484996
3.277720568838736e-06
0.0008345487693768067
3.7461

: 

# Raw Sweep Plotting

In [1]:
# Function to plot PD/LDI and Vf/LDI for a specific laser and wafer
def plot_specific_touchdown(df_raw_sweeps, wafer_code, touchdown, pnt_size):
    specific_data = df_raw_sweeps[(df_raw_sweeps["WAFER_ID"] == wafer_code) & (df_raw_sweeps["TOUCHDOWN"] == touchdown)]

    if specific_data.empty:
        print(f"No data found for Wafer Code: {wafer_code} and TOUCHDOWN: {touchdown}")
        return

    fig, ((ax1, ax2), (ax3, ax4), (ax5, ax6)) = plt.subplots(3, 2, figsize=(18, 15))

    # Plot PD/LDI
    ax1.scatter(specific_data["LDI_mA"], specific_data["PD"], s=pnt_size, color="blue")
    ax1.set_title(f"{wafer_code}: Scatter Plot of PD vs LDI_mA for TOUCHDOWN {touchdown}")
    ax1.set_xlabel("LDI_mA")
    ax1.set_ylabel("PD")
    ax1.grid(True)

    # Plot dP/dI
    ax3.scatter(specific_data["LDI_mA"], specific_data["dP/dI"], s=pnt_size, color="blue")
    ax3.set_title(f"{wafer_code}: Scatter Plot of dP/dI for TOUCHDOWN {touchdown}")
    ax3.set_xlabel("LDI_mA")
    ax3.set_ylabel("dP/dI")
    ax3.grid(True)

    # Plot d2P/dI2
    ax5.scatter(specific_data["LDI_mA"], specific_data["d2P/dI2"], s=pnt_size, color="blue")
    ax5.set_title(f"{wafer_code}: Scatter Plot of d2P/dI2 for TOUCHDOWN {touchdown}")
    ax5.set_xlabel("LDI_mA")
    ax5.set_ylabel("d2P/dI2")
    ax5.grid(True)

    # Plot Vf/LDI
    ax2.scatter(specific_data["LDI_mA"], specific_data["Vf"], s=pnt_size, color="green")
    ax2.set_title(f"{wafer_code}: Scatter Plot of Vf vs LDI_mA for TOUCHDOWN {touchdown}")
    ax2.set_xlabel("LDI_mA")
    ax2.set_ylabel("Vf")
    ax2.grid(True)

    # Plot dV/dI
    ax4.scatter(specific_data["LDI_mA"], specific_data["dV/dI"], s=pnt_size, color="green")
    ax4.set_title(f"{wafer_code}: Scatter Plot of dV/dI vs LDI_mA for TOUCHDOWN {touchdown}")
    ax4.set_xlabel("LDI_mA")
    ax4.set_ylabel("dV/dI")
    ax4.grid(True)

    # Plot d2V/dI2
    ax6.scatter(specific_data["LDI_mA"], specific_data["d2V/dI2"], s=pnt_size, color="green")
    ax6.set_title(f"{wafer_code}: Scatter Plot of d2V/dI2 vs LDI_mA for TOUCHDOWN {touchdown}")
    ax6.set_xlabel("LDI_mA")
    ax6.set_ylabel("d2V/dI2")
    ax6.grid(True)

    plt.tight_layout()

    plt.show()


def gaussian(x, a, mu, sigma):
    """Gaussian function for curve fitting."""
    return a * np.exp(-((x - mu) ** 2) / (2 * sigma**2))


def plot_with_smoothing_and_normalization(
    df_raw_sweeps, wafer_code, touchdown, window_length=5, polyorder=2, max_slope_fitpnt=20
):
    specific_data = df_raw_sweeps[(df_raw_sweeps["WAFER_ID"] == wafer_code) & (df_raw_sweeps["TOUCHDOWN"] == touchdown)]

    if specific_data.empty:
        print(f"No data found for Wafer Code: {wafer_code} and TOUCHDOWN: {touchdown}")
        return

    # Sort data by LDI to ensure proper plotting
    specific_data = specific_data.sort_values(by="LDI_mA")

    # Normalize PD using min-max scaling
    min_PD = specific_data["PD"].min()
    max_PD = specific_data["PD"].max()
    specific_data["PD_norm"] = (specific_data["PD"] - min_PD) / (max_PD - min_PD)

    # Apply Savitzky-Golay smoothing to normalized PD
    smoothed_PD_norm = savgol_filter(specific_data["PD_norm"], window_length=window_length, polyorder=polyorder)

    # Compute differentials on normalized & smoothed data
    dP_dI_norm = np.gradient(specific_data["PD_norm"], specific_data["LDI_mA"])
    smoothed_dP_dI_norm = np.gradient(smoothed_PD_norm, specific_data["LDI_mA"])

    d2P_dI2_norm = np.gradient(dP_dI_norm, specific_data["LDI_mA"])
    smoothed_d2P_dI2_norm = np.gradient(smoothed_dP_dI_norm, specific_data["LDI_mA"])

    # Fit Gaussian to the smoothed second differential
    try:
        mask = (specific_data["LDI_mA"] >= 2) & (specific_data["LDI_mA"] <= 30)
        current_masked = specific_data["LDI_mA"][mask]
        smoothed_d2P_dI2_norm_masked = smoothed_d2P_dI2_norm[mask]

        p0 = [np.max(smoothed_d2P_dI2_norm_masked), np.median(current_masked), np.std(current_masked)]
        popt, _ = curve_fit(gaussian, current_masked, smoothed_d2P_dI2_norm_masked, p0=p0)
        gaussian_fit = gaussian(specific_data["LDI_mA"], *popt)
        median_x = popt[1]
    except:
        median_x = np.nan

    fig, axes = plt.subplots(3, 2, figsize=(18, 20))  # Extra row for line fitting plot

    # Normalized PD vs LDI (Unsmoothed)
    axes[0, 0].plot(specific_data["LDI_mA"], specific_data["PD_norm"], color="blue", label="Normalized PD (Raw)")
    axes[0, 0].set_title(f"{wafer_code}: PD vs LDI (Normalized, Unsmoothed)")
    axes[0, 0].set_xlabel("LDI_mA")
    axes[0, 0].set_ylabel("Normalized PD")
    axes[0, 0].grid(True)

    # Normalized PD vs LDI (Smoothed)
    axes[0, 1].plot(specific_data["LDI_mA"], smoothed_PD_norm, color="red", label="Normalized PD (Smoothed)")
    axes[0, 1].set_title(f"{wafer_code}: PD vs LDI (Normalized & Smoothed)")
    axes[0, 1].set_xlabel("LDI_mA")
    axes[0, 1].set_ylabel("Normalized PD")
    axes[0, 1].grid(True)

    # Normalized dP/dI (Unsmoothed)
    axes[1, 0].plot(specific_data["LDI_mA"], dP_dI_norm, color="blue", label="dP/dI (Raw)")
    axes[1, 0].set_title(f"{wafer_code}: dP/dI (Normalized, Unsmoothed)")
    axes[1, 0].set_xlabel("LDI_mA")
    axes[1, 0].set_ylabel("dP/dI")
    axes[1, 0].grid(True)

    # Normalized dP/dI (Smoothed)
    axes[1, 1].plot(specific_data["LDI_mA"], smoothed_dP_dI_norm, color="red", label="dP/dI (Smoothed)")
    axes[1, 1].set_title(f"{wafer_code}: dP/dI (Normalized & Smoothed)")
    axes[1, 1].set_xlabel("LDI_mA")
    axes[1, 1].set_ylabel("dP/dI")
    axes[1, 1].grid(True)

    # Normalized d2P/dI2 (Unsmoothed)
    axes[2, 0].plot(specific_data["LDI_mA"], d2P_dI2_norm, color="blue", label="d2P/dI2 (Raw)")
    axes[2, 0].set_title(f"{wafer_code}: d2P/dI2 (Normalized, Unsmoothed)")
    axes[2, 0].set_xlabel("LDI_mA")
    axes[2, 0].set_ylabel("d2P/dI2")
    axes[2, 0].grid(True)

    # Normalized d2P/dI2 (Smoothed) + Gaussian Fit
    axes[2, 1].plot(specific_data["LDI_mA"], smoothed_d2P_dI2_norm, color="red", label="d2P/dI2 (Smoothed)")
    axes[2, 1].plot(specific_data["LDI_mA"], gaussian_fit, color="purple", linestyle="dashed", label="Gaussian Fit")

    # Mark median_x with a vertical line
    if not np.isnan(median_x):
        axes[2, 1].axvline(median_x, color="black", linestyle="dotted", label=f"Median: {median_x:.3f}")

    axes[2, 1].set_title(f"{wafer_code}: d2P/dI2 (Normalized & Smoothed) + Gaussian Fit")
    axes[2, 1].set_xlabel("LDI_mA")
    axes[2, 1].set_ylabel("d2P/dI2")
    axes[2, 1].grid(True)
    axes[2, 1].legend()

    # ------------------- LINEAR FITTING SECTION (Separate Figure) -------------------
    if not np.isnan(median_x):
        # Split data at median_x
        left_side = specific_data[specific_data["LDI_mA"] <= median_x]
        right_side_mask = (specific_data["LDI_mA"] > median_x) & (specific_data["LDI_mA"] < max_slope_fitpnt)
        right_side = specific_data[right_side_mask]

        if not left_side.empty and not right_side.empty:
            # Fit linear regression to both segments
            slope_left, intercept_left, _, _, _ = linregress(left_side["LDI_mA"], left_side["PD_norm"])
            slope_right, intercept_right, _, _, _ = linregress(right_side["LDI_mA"], right_side["PD_norm"])

            # Generate fitted lines
            fit_left = slope_left * left_side["LDI_mA"] + intercept_left
            fit_right = slope_right * right_side["LDI_mA"] + intercept_right

            # Compute intersection point
            intersection_x = (intercept_right - intercept_left) / (slope_left - slope_right)
            intersection_y = slope_left * intersection_x + intercept_left

        # Create a separate figure for linear fits
        plt.figure(figsize=(10, 6))

        # Plot original normalized PD data as scatter points
        plt.scatter(
            specific_data["LDI_mA"], specific_data["PD_norm"], color="green", marker="+", alpha=1, label="Original Data"
        )

        # Plot linear fits
        plt.plot(left_side["LDI_mA"], fit_left, color="blue", label=f"Left Fit (Slope={slope_left:.3f})")
        plt.plot(right_side["LDI_mA"], fit_right, color="red", label=f"Right Fit (Slope={slope_right:.3f})")

        # Mark the intersection point
        plt.scatter(
            intersection_x,
            intersection_y,
            color="black",
            marker="x",
            s=200,
            label=f"Intersection at LDI={intersection_x:.3f}",
        )

        # Labels and title
        plt.title(f"{wafer_code}: Linear Fits Split at Median")
        plt.xlabel("LDI_mA")
        plt.ylabel("Normalized PD")
        plt.legend()
        plt.grid(True)

    plt.tight_layout()
    plt.show()


# INPUT THE DESIRED PROFILE TO EXAMINE HERE
# Define the specific wafer code and TOUCHDOWN number
WAFER_CODE = "QCHZZ"
TOUCHDOWN = 20

# Find the correct dataframe where the wafer code matches the input
df_raw_sweeps = None
for df in annotated_sweeps_tables:
    if df["WAFER_ID"].iloc[0] == WAFER_CODE:
        df_raw_sweeps = df
        break

if df_raw_sweeps is not None:
    # Plot for the specified touchdown number
    # plot_specific_touchdown(df_raw_sweeps, WAFER_CODE, TOUCHDOWN, pnt_size=5)
    plot_with_smoothing_and_normalization(df_raw_sweeps, WAFER_CODE, TOUCHDOWN, window_length=5, polyorder=2)
    ITH_value = find_ith_value(df_raw_sweeps, "QCHZZ", TOUCHDOWN)
    print(f"ITH value: {ITH_value}")
else:
    print(f"No data found for Wafer Code: {WAFER_CODE}")

NameError: name 'annotated_sweeps_tables' is not defined