In [1]:
import pandas as pd
import os
import numpy as np
from nptdms import TdmsFile
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors

In [2]:
# Paths
base_path = "/Users/shubhangchauhan/Desktop/Procyon Data/TaehwaV3"
date_folders = ["11_06_2025", "10_06_2025"]

# Flags:
USE_DBSCAN_FILTER = True  # Set to False for no DBSCAN Filter
USE_MTPV_ANALYSIS = True  # Set to False for MTPA

# Torque and Speed grouping centers
torque_centers = np.arange(0, 50, 5) 
speed_centers = np.arange(3000,7000,500)

# Voltage threshold
Vdc = 51
Vmax_peak = Vdc*0.666
threshold_voltage = 0.9 * Vmax_peak

# Functions
def assign_torque_group(torque):
    if pd.isna(torque):
        return None
    for center in torque_centers:
        if center - 0.2 <= torque <= center + 0.2:
            return center
    return None

def assign_speed_group(speed):
    if pd.isna(speed):
        return None
    for center in speed_centers:
        if center - 20 <= speed <= center + 20:
            return center
    return None

In [3]:
# Final collected data
final_grouped = []

for date in date_folders:
    date_path = os.path.join(base_path, date)
    filtered_data = []

    for filename in os.listdir(date_path): 
        if filename.endswith(".tdms"):
            file_path = os.path.join(date_path, filename) 
            try:
                tdms_file = TdmsFile.read(file_path)
                df = tdms_file.as_dataframe()

                # Clean column names
                df.columns = df.columns.str.replace(' ','_')
                df.columns = df.columns.str.replace(f"/'Calculated_Data'/", "", regex=True)
                df.columns = df.columns.str.replace("'", "", regex=False)
                df.EFFI1 = df.EFFI1 * 100.0 

                # Required columns check
                required_cols = ["CAN_Id_Ref", "CAN_Iq_Ref", "Torque", "CAN_Vstator_V","CAN_Iq_in", "CAN_Id_in","CAN_Ld_uH", "CAN_Lq_uH"]

                if all(col in df.columns for col in required_cols):
                    original_len = len(df)

                    # Filter rows where both Id_ref and Iq_reg = 0
                    df_filtered = df[(df["CAN_Id_Ref"] != 0) & (df["CAN_Iq_Ref"] != 0)]

                    # Remove rows where Iq_ref < 0 OR Id_ref > 0 or Iq_in < 0 or Id_in > 0
                    df_filtered = df_filtered[~((df_filtered["CAN_Iq_Ref"] < 0) | (df_filtered["CAN_Id_Ref"] > 0) | (df_filtered["CAN_Iq_in"] < 0) | (df_filtered["CAN_Id_in"] > 0))]

                    df_filtered["Torque"] = pd.to_numeric(df_filtered["Torque"], errors='coerce').abs()
                    df_filtered = df_filtered[~df_filtered["Torque"].isna()]

                    # Make Speed positive and filter if column exists
                    if "Speed" in df_filtered.columns:
                        df_filtered["Speed"] = pd.to_numeric(df_filtered["Speed"], errors='coerce').abs()
                        if USE_MTPV_ANALYSIS:
                            df_filtered = df_filtered[df_filtered["Speed"] > 3000]
                        else:
                            df_filtered = df_filtered[df_filtered["Speed"] <= 3000]

                        # Remove rows where Speed changes between consecutive rows
                        df_filtered["Speed_Diff"] = df_filtered["Speed"].diff()
                        df_filtered = df_filtered[(df_filtered["Speed_Diff"] == 0) | (df_filtered["Speed_Diff"].isna())]
                        df_filtered.drop(columns=["Speed_Diff"], inplace=True)

                    # Filter for specific Ld and Lq values
                    # Ensure values are numeric
                    df_filtered["CAN_Ld_uH"] = pd.to_numeric(df_filtered["CAN_Ld_uH"], errors="coerce")
                    df_filtered["CAN_Lq_uH"] = pd.to_numeric(df_filtered["CAN_Lq_uH"], errors="coerce")

                    df_filtered = df_filtered.dropna(subset=["CAN_Ld_uH", "CAN_Lq_uH"])
                    df_filtered = df_filtered[(df_filtered["CAN_Ld_uH"] == 46.3) & (df_filtered["CAN_Lq_uH"] == 73.48)]

                    # Filter based on comparision
                    for col in ["CAN_Iq_in", "CAN_Iq_Ref", "CAN_Id_in", "CAN_Id_Ref"]:
                        df_filtered[col] = pd.to_numeric(df_filtered[col], errors='coerce')

                    df_filtered = df_filtered.dropna(subset=["CAN_Iq_in", "CAN_Iq_Ref", "CAN_Id_in", "CAN_Id_Ref"])
                    df_filtered = df_filtered[
                        ((df_filtered["CAN_Iq_in"] - df_filtered["CAN_Iq_Ref"]).abs() <= 10) &
                        ((df_filtered["CAN_Id_in"] - df_filtered["CAN_Id_Ref"]).abs() <= 10)
                    ]

                    # Filter based on bandwidth
                    if "CAN_Bandwidth" in df_filtered.columns:
                        df_filtered = df_filtered[df_filtered["CAN_Bandwidth"] == 500]

                    rows_removed = original_len - len(df_filtered)
                    if rows_removed > 0:
                        print(f"Removed {rows_removed} rows in {filename}")

                    df_filtered["source_file"] = filename
                    filtered_data.append(df_filtered)
                    print(f"Filtered using columns: {', '.join(required_cols)}")
                else:
                    print(f"Required columns not found in {filename}")

            except Exception as e:
                print(f"Failed to process {filename}: {e}")

    if filtered_data:
        combined_df = pd.concat(filtered_data, ignore_index=True)

        # Group and classify
        combined_df["Torque_Group"] = combined_df["Torque"].apply(assign_torque_group)
        if USE_MTPV_ANALYSIS:
            combined_df["Speed_Group"] = combined_df["Speed"].apply(assign_speed_group)

        # MTPA vs MTPV Filtering 
        if USE_MTPV_ANALYSIS:
            combined_df = combined_df[combined_df["CAN_Vstator_V"].abs() > threshold_voltage]
        else:
            combined_df = combined_df[combined_df["CAN_Vstator_V"].abs() < threshold_voltage]

        combined_df = combined_df.sort_values(by=["Torque_Group", "Torque", "CAN_Vstator_V"])
        combined_df.insert(0, "Index", range(1, len(combined_df) + 1))

        # Compute Pout, Pin, Efficiency, and Filter Efficiency > 1 
        combined_df["Torque"] = pd.to_numeric(combined_df["Torque"], errors='coerce')
        combined_df["Speed"] = pd.to_numeric(combined_df.get("Speed", np.nan), errors='coerce')
        combined_df["Udc4"] = pd.to_numeric(combined_df.get("Udc4", np.nan), errors='coerce')
        combined_df["Idc4"] = pd.to_numeric(combined_df.get("Idc4", np.nan), errors='coerce')

        combined_df["Pout"] = combined_df["Torque"] * combined_df["Speed"] * 2 * np.pi / 60
        combined_df["Pin"] = combined_df["Udc4"] * combined_df["Idc4"]
        combined_df["Efficiency"] = np.where(combined_df["Pin"] != 0, combined_df["Pout"] / combined_df["Pin"], np.nan)

        combined_df["Efficiency"] = combined_df["Efficiency"].abs()
        combined_df = combined_df[(combined_df["Efficiency"] <= 1) & combined_df["Efficiency"].notna() & np.isfinite(combined_df["Efficiency"])]

        if USE_MTPV_ANALYSIS:
            combined_df["CAN_Vstator_V"] = pd.to_numeric(combined_df["CAN_Vstator_V"], errors="coerce")
            combined_df["Torque_per_Volt"] = np.where(combined_df["CAN_Vstator_V"] != 0,combined_df["Torque"] / combined_df["CAN_Vstator_V"],np.nan)
        elif "CAN_Istator_A" in combined_df.columns:
            combined_df["CAN_Istator_A"] = pd.to_numeric(combined_df["CAN_Istator_A"], errors="coerce")
            combined_df["Torque_per_Amp"] = np.where(combined_df["CAN_Istator_A"] != 0,combined_df["Torque"] / combined_df["CAN_Istator_A"],np.nan)
        else:
            print(f"Missing required columns for MTPA/MTPV calculation in {date}")

        # Save per-date grouped CSV
        grouped_csv_path = f"/Users/shubhangchauhan/Desktop/{date}_torque_grouped.csv"
        combined_df.to_csv(grouped_csv_path, index=False)
        print(f"Saved per-date grouped CSV: {grouped_csv_path}")

        final_grouped.append(combined_df)

Removed 1355 rows in 15nmSweep_11_06_2020_14_41 0.tdms
Filtered using columns: CAN_Id_Ref, CAN_Iq_Ref, Torque, CAN_Vstator_V, CAN_Iq_in, CAN_Id_in, CAN_Ld_uH, CAN_Lq_uH
Removed 696 rows in 15nmSweep_11_06_2020_14_30 0.tdms
Filtered using columns: CAN_Id_Ref, CAN_Iq_Ref, Torque, CAN_Vstator_V, CAN_Iq_in, CAN_Id_in, CAN_Ld_uH, CAN_Lq_uH
Removed 1580 rows in 15nmSweep_11_06_2020_15_12 0.tdms
Filtered using columns: CAN_Id_Ref, CAN_Iq_Ref, Torque, CAN_Vstator_V, CAN_Iq_in, CAN_Id_in, CAN_Ld_uH, CAN_Lq_uH
Removed 5777 rows in AngleCalibration_11_06_2020_10_04 0.tdms
Filtered using columns: CAN_Id_Ref, CAN_Iq_Ref, Torque, CAN_Vstator_V, CAN_Iq_in, CAN_Id_in, CAN_Ld_uH, CAN_Lq_uH
Removed 271 rows in 20nmSweep1_11_06_2020_17_57 0.tdms
Filtered using columns: CAN_Id_Ref, CAN_Iq_Ref, Torque, CAN_Vstator_V, CAN_Iq_in, CAN_Id_in, CAN_Ld_uH, CAN_Lq_uH
Removed 8297 rows in 25nmSweep1_11_06_2020_19_17 0.tdms
Filtered using columns: CAN_Id_Ref, CAN_Iq_Ref, Torque, CAN_Vstator_V, CAN_Iq_in, CAN_Id_in, 

In [4]:
# Combine all data
if final_grouped:
    all_grouped_df = pd.concat(final_grouped, ignore_index=True)
    all_grouped_df.drop(columns=["Index"], inplace=True, errors='ignore')
    all_grouped_df.insert(0, "Index", range(1, len(all_grouped_df) + 1))

    # Adaptive DBSCAN filtering
    filtered_groups = []

    for tg, group in all_grouped_df.groupby("Torque_Group"):
        if len(group) < 10:
            continue

        features = group[["CAN_Id_in", "CAN_Iq_in"]].dropna()
        if features.empty:
            continue

        scaler = StandardScaler()
        features_scaled = scaler.fit_transform(features)

        neigh = NearestNeighbors(n_neighbors=5)
        nbrs = neigh.fit(features_scaled)
        distances, _ = nbrs.kneighbors(features_scaled)
        k_distances = distances[:, -1]
    
        #eps_val = np.percentile(k_distances, 90)
        eps_val = max(np.percentile(k_distances, 90), 0.1)


        db = DBSCAN(eps=eps_val, min_samples=5).fit(features_scaled)
        labels = db.labels_

        mask = labels != -1
        filtered = group.loc[features.index[mask]].copy()
        filtered_groups.append(filtered)

    all_grouped_df_dbscan = pd.concat(filtered_groups, ignore_index=True)
    print("Adaptive DBSCAN filtering applied for Iq_in vs Id_in by Torque group.")


   # Choose dataset based on DBSCAN flag
    final_df = all_grouped_df_dbscan if USE_DBSCAN_FILTER else all_grouped_df
    final_csv_path = "/Users/shubhangchauhan/Desktop/torque_mtpa_mtpv_all_data.csv"
    final_df.to_csv(final_csv_path, index=False)
    print(f"Final grouped CSV saved: {final_csv_path}")

    print(all_grouped_df[["Torque_Group"]].value_counts(dropna=False))

    #  Create a dataframe for max Torque_per_Amp/ Torque_per_volt per group
    if final_df is not None and USE_MTPV_ANALYSIS:
        tv_valid = final_df.dropna(subset=["Torque_Group", "Speed_Group", "Torque_per_Volt"])
        tv_valid["Torque_per_Volt"] = pd.to_numeric(tv_valid["Torque_per_Volt"], errors="coerce")
        max_indices = (tv_valid.groupby(["Torque_Group", "Speed_Group"])["Torque_per_Volt"].idxmax().dropna())
        MTPV_df = final_df.loc[max_indices].sort_values(["Torque_Group", "Speed_Group"]).reset_index(drop=True)
        MTPV_df.to_csv("/Users/shubhangchauhan/Desktop/MTPV.csv", index=False)
        print("Saved MTPV.csv")

    elif "Torque_per_Amp" in final_df.columns:
        tpa_valid = final_df.dropna(subset=["Torque_Group", "Torque_per_Amp"])
        tpa_valid["Torque_per_Amp"] = pd.to_numeric(tpa_valid["Torque_per_Amp"], errors="coerce")
        max_indices = tpa_valid.groupby("Torque_Group")["Torque_per_Amp"].idxmax()
        MTPA_df = final_df.loc[max_indices].sort_values("Torque_Group").reset_index(drop=True)
        MTPA_df.to_csv("/Users/shubhangchauhan/Desktop/MTPA.csv", index=False)
        print(" Saved MTPA.csv")
    else:
        print("Required columns missing for Torque efficiency filtering.")



Adaptive DBSCAN filtering applied for Iq_in vs Id_in by Torque group.
Final grouped CSV saved: /Users/shubhangchauhan/Desktop/torque_mtpa_mtpv_all_data.csv
Torque_Group
5.0             73918
NaN             44415
10.0             4081
15.0             1147
20.0               93
25.0               56
0.0                 5
Name: count, dtype: int64
Saved MTPV.csv


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  tv_valid["Torque_per_Volt"] = pd.to_numeric(tv_valid["Torque_per_Volt"], errors="coerce")


In [5]:
# Plots
import plotly.express as px
import plotly.io as pio
import plotly.graph_objects as go
from scipy.interpolate import griddata

# Torque vs Speed Scatter with hover info #####
fig1 = px.scatter(
    all_grouped_df,
    x="Speed",
    y="Torque",
    hover_data={
        "Speed": False,
        "Torque": False,
        "CAN_Id_in": True,
        "CAN_Iq_in": True
    },
    labels={"Speed": "Speed (RPM)", "Torque": "Torque (Nm)"},
    title="Torque vs Speed"
)
fig1.update_traces(
    hovertemplate="<br>".join([
        "Speed (RPM): %{x}",
        "Torque (Nm): %{y}",
        "Id (A): %{customdata[0]}",
        "Iq (A): %{customdata[1]}"
    ])
)
pio.write_html(fig1, file="/Users/shubhangchauhan/Desktop/torque_speed_plot.html", auto_open=False)

# Iq_in vs Id_in Scatter ########
fig2 = px.scatter(
    all_grouped_df,
    x="CAN_Id_in",
    y="CAN_Iq_in",
    title="Iq_in vs Id_in",
    hover_data={
        "Speed": True,
        "Torque": True,
        "CAN_Id_in": True,
        "CAN_Iq_in": True
    },
    labels={"CAN_Id_in": "Id_in (A)", "CAN_Iq_in": "Iq_in (A)"}
)
pio.write_html(fig2, file="/Users/shubhangchauhan/Desktop/iq_id_plot.html", auto_open=False)


# Iq_in vs Id_in with torque group for MTPA/MTPV ######

df_plot = final_df.dropna(subset=["Torque_Group", "CAN_Id_in", "CAN_Iq_in"]).copy() 
min_points = MTPV_df.copy() if USE_MTPV_ANALYSIS else MTPA_df.copy()

# Base scatter plot
fig = px.scatter(
    df_plot,
    x="CAN_Id_in",
    y="CAN_Iq_in",
    color="Torque_Group",
    title="Iq_in vs Id_in by Torque Group",
    labels={"CAN_Id_in": "Id_in (A)", "CAN_Iq_in": "Iq_in (A)", "Torque_Group": "Torque Group (Nm)"},
    opacity=0.3,
    hover_data={
        "Speed": True,
        "Torque": True,
        "CAN_Id_in": True,
        "CAN_Iq_in": True
    }
    #color_continuous_scale="Viridis"
)

point_label = "MTPV point" if USE_MTPV_ANALYSIS else "MTPA point"
# Add 'X' markers for min points
fig.add_trace(go.Scatter(
    x=min_points["CAN_Id_in"],
    y=min_points["CAN_Iq_in"],
    mode='markers+text',
    text=[point_label]*len(min_points),
    textposition="top center",
    name=f"{point_label}s",
    marker=dict(
        symbol="x",
        size=12,
        color="black",
        line=dict(width=2, color="white")
    ),
    showlegend=True
))

# Create a summary text
if USE_MTPV_ANALYSIS:
    summary_lines = [
        f"Torque {row['Torque_Group']:.1f} Nm → T/Vₛ = {row['Torque_per_Volt']:.2f} Nm/V"
        for _, row in min_points.iterrows()
    ]
else:
    summary_lines = [
        f"Torque {row['Torque_Group']:.1f} Nm → T/Iₛ = {row['Torque_per_Amp']:.2f} Nm/A"
        for _, row in min_points.iterrows()
    ]
summary_text = "<br>".join(summary_lines)

# Add annotation box
fig.add_annotation(
    text=summary_text,
    align="left",
    showarrow=False,
    xref="paper", yref="paper",
    x=1, y=1, 
    bordercolor="black",
    borderwidth=1,
    bgcolor="white",
    font=dict(size=12),
)

output_path = "/Users/shubhangchauhan/Desktop/iq_vs_id_torque_colored.html"
fig.write_html(output_path)

# # Iq vs Id for a torque group #######
# df_torque5 = final_df[all_grouped_df["Torque_Group"] == 5].dropna(subset=["CAN_Id_in", "CAN_Iq_in"]) 

# fig = px.scatter(
#     df_torque5,
#     x="CAN_Id_in",
#     y="CAN_Iq_in",
#     title="Iq_in vs Id_in (Torque Group = 5 Nm)",
#     labels={"CAN_Id_in": "Id_in (A)", "CAN_Iq_in": "Iq_in (A)"},
#     opacity=0.7,
#     hover_data={
#         "CAN_Id_in": True,
#         "CAN_Iq_in": True,
#         "CAN_Vstator_V": True,
#         "CAN_Istator_A": True
#     }
# )
# plot_path = "/Users/shubhangchauhan/Desktop/iq_vs_id_torque_5nm.html"
# fig.write_html(plot_path)

# Contour Plot Torque vs Speed ########
df_plot = all_grouped_df_dbscan.copy() if USE_DBSCAN_FILTER else all_grouped_df.copy()

# Drop rows with missing or invalid values
df_plot = df_plot.dropna(subset=["Torque", "Speed", "Efficiency"]).copy()

# Convert to percent and filter
df_plot["Efficiency(%)"] = df_plot["Efficiency"] * 100
df_plot = df_plot[(df_plot["Efficiency(%)"] > 0) & (df_plot["Efficiency(%)"] <= 100)]

# Extract x, y, z
x = df_plot["Speed"].values
y = df_plot["Torque"].values
z = df_plot["Efficiency(%)"].values

# Define a regular grid to interpolate over
xi = np.linspace(np.min(x), np.max(x), 300)
yi = np.linspace(np.min(y), np.max(y), 300)
xi_grid, yi_grid = np.meshgrid(xi, yi)

# Interpolate using linear method to avoid wild spikes
zi = griddata((x, y), z, (xi_grid, yi_grid), method='linear')

# Remove NaNs to avoid plotting garbage
zi = np.nan_to_num(zi, nan=0)

# Create contour plot
fig_contour = go.Figure(data=go.Contour(
    z=zi,
    x=xi,
    y=yi,
    colorscale='Viridis',
    contours=dict(
        coloring="heatmap",
        showlabels=True,
        labelfont=dict(size=10, color='white')
    ),
    colorbar=dict(title="Efficiency (%)")
))

fig_contour.update_layout(
    title="Efficiency Contour Plot: Torque vs Speed",
    xaxis_title="Speed (RPM)",
    yaxis_title="Torque (Nm)",
    width=1200,
    height=800
)

# Save to HTML
contour_output_path = "/Users/shubhangchauhan/Desktop/efficiency_contour_plot.html"
fig_contour.write_html(contour_output_path)

print("Plots saved as HTML.")



Plots saved as HTML.
