In [1]:
pip install scipy scikit-learn fastdtw matplotlib

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
import numpy as np
import pandas as pd
import os
import glob
from scipy.fft import fft, ifft
from sklearn.metrics import mean_squared_error
from itertools import product

In [3]:
# === CONFIGURATION ===
DATA_DIR = "processed"  # Update path if needed
# # finds the top closest match in K-DTW to predict where the new leak is. 
K = 1
# Controls how flexible the DTW comparison is across time.
MAX_WARPING_WINDOW = 30
# Reduce the size of data by taking every 2nd value, helps with speed.
SUBSAMPLE_STEP = 2

In [4]:
# === FFT Smoothing ===

# Removes noise from sensor data using FFT(Fast Fourier Transform). 
def fft_smooth(signal, threshold=0.005):
    # Keeps strong signal problems (like actual gas concentration changes) and removes weak patterns (random noise). 
    fft_vals = fft(signal)
    fft_magnitudes = np.abs(fft_vals)
    # Flatten(easier to analyze and visualize) the noise to keep the meaningful pattern. 
    fft_vals[fft_magnitudes < threshold * np.max(fft_magnitudes)] = 0
    return np.real(ifft(fft_vals))

In [5]:
# === Load and Preprocess ===
def preprocess_csv(file_path):
    # Read csv file into memory (df = dataframe).
    df = pd.read_csv(file_path).dropna()
    # Keeps only the sensor columns p1 to p16.
    sensor_cols = [f"p{i}" for i in range(1, 17)]
    df = df[sensor_cols]

    # Prepares a list (smoothed) to hold cleaned data for each sensor.
    smoothed = []
    # for each sensor column
    for col in df.columns:
        # Smooths(reducing noise within dataset) the signal using the fft_smooth function. 
        series = fft_smooth(df[col].values)
        # normalizes the data (subtracts the mean and divides by standard deviation) --> makes all sensor signals comparable (same scale). 
        norm_series = (series - np.mean(series)) / np.std(series)
        smoothed.append(norm_series)
    
    # all 16 cleaned, normalized sensor signals are then combined together into one long 1D array. 
    # If each sensor has 2000 values, then all 16 sensors combined = 16 x 2000 => 32,000 values in one vector. 
    combined = np.concatenate(smoothed)
    # We use this by K-DTW for comparing tests. 
    return combined

In [6]:
def load_all_data():
    X_list, y_list, names = [], [], []
    pattern = os.path.join(DATA_DIR, "*_test-*_range.csv")
    for fp in glob.glob(pattern):
        df_all = pd.read_csv(fp).dropna()
        true_x = df_all['X']
        true_z = df_all['Z']
        vec = preprocess_csv(fp)
        X_list.append(vec)
        y_list.append(np.array([true_x, true_z]))
        names.append(os.path.basename(fp).replace(".csv", ""))

    return X_list, y_list, names

In [7]:
# # Find all the csv files in the datasets folder that start with "test-" (like test-2.csv).
# # This builds the training dataset for K-DTW. 
# def load_all_data():
#     X_data, y_coords, labels = [], [], []
#     for file in glob.glob(os.path.join(DATA_DIR, "*_test-*_range.csv")):
#         name = os.path.basename(file).replace(".csv", "")

#         # Skips any test that doesn't have a known leak coordinate (from LEAK_COORDS).
#         if name not in LEAK_COORDS:
#             continue
#         coord = LEAK_COORDS[name]
#         # For each valid test, it loads and cleans (ensures that data used to train models is accurate, consistent, and reliable) the sensor data using preprocess_csv(). 
#         vector = preprocess_csv(file)
#         # Stores the cleaned data (vector). 
#         X_data.append(vector)
#         # Stores the known [X, Z] leak location from that test. 
#         y_coords.append(coord)
#         labels.append(name)

#     # Returns all data vectors, their leak coordinates, and the test names. 
#     return X_data, y_coords, labels

In [8]:
def dtw_distance(ts1, ts2, max_warping_window = np.inf, subsample_step=1):
    a = ts1[::subsample_step]
    b = ts2[::subsample_step]
    n, m = len(a), len(b)

    if np.isfinite(max_warping_window):
        w = int(max_warping_window)
        if w < abs(n - m):
            return np.inf

        w = min(w, max(n, m))

    else:
        w = max(n, m)

    prev = np.full(m + 1, np.inf)
    curr = np.full(m + 1, np.inf)
    prev[0] = 0

    for i in range(1, n + 1):
        curr[:] = np.inf
        j0 = max(1, i - w)
        j1 = min(m, i + w)

        for j in range(j0, j1 + 1):
            cost = abs(a[i - 1] - b[j - 1])

            curr[j] = cost + min(prev[j], prev[j-1], curr[j-1])

        prev, curr = curr, prev

    return prev[m]

In [9]:
# # === DTW Distance with warping + subsampling ===

# # This function compares two time series vectors (like one test and one train sample) using DTW. 
# # Subsampling = reduces the data size. If step = 2, it uses every other point. 

# # Creates a DTW matrix to store costs of aligning each point in test(1) to each point in test(2).
# def dtw_distance(ts1, ts2, max_warping_window=np.inf, subsample_step=1):

    
#     ts1 = ts1[::subsample_step]
#     ts2 = ts2[::subsample_step]

#     # Fills the matrix by comparing each point in the two sequences.
#     # Finding the lowest-cost way to align them. 
#     n, m = len(ts1), len(ts2)
#     dtw_matrix = np.full((n + 1, m + 1), np.inf)
#     dtw_matrix[0, 0] = 0

    
#     for i in range(1, n + 1):
#         for j in range(max(1, i - int(max_warping_window)), min(m + 1, i + int(max_warping_window))):
#             cost = abs(ts1[i - 1] - ts2[j - 1])
#             dtw_matrix[i, j] = cost + min(
#                 dtw_matrix[i - 1, j],
#                 dtw_matrix[i, j - 1],
#                 dtw_matrix[i - 1, j - 1]
#             )
#     # Returns the final alignment cost, smaller means the two time series are more similar. 
#     return dtw_matrix[n, m]


# # This function uses DTW to find the closest match in time-evolving patterns, even if events (like peak gas arrival) happen at different times. 
# # Finds the minimum "cost" to make ts1 look like ts2, whcih means they are similar gas leak patterns.

In [10]:
# === K-DTW Regressor ===

# takes in a test vector (sensor data of unknown leak) and compares it to all known vectors (training data).
def k_dtw_predict(test_vec, train_vecs, train_labels, k=1, max_warping_window=np.inf, subsample_step=1):

    # Uses DTW distance to measure how similar each one is.
    distances = []
    for i, vec in enumerate(train_vecs):
        dist = dtw_distance(test_vec, vec, max_warping_window, subsample_step)
        distances.append((dist, train_labels[i]))

    # Sorts them to find the top K closest matches. 
    distances.sort(key=lambda x: x[0])
    nearest = [label for _, label in distances[:k]]

    # Averages their known coordinates [X, Z] to predict the leak location of the test. 
    return np.mean(nearest, axis=0)

In [None]:
# OPTIMIZATION (SKIP WHEN DONE)
K_values = [1, 3, 5]
window_values = [np.inf, 10, 30, 50]
subsample_steps = [1, 2, 4]

X_all, y_all, labels = load_all_data()

records = []

for K, max_warp, subsample in product(K_values, window_values, subsample_steps):
    errs = []
    for i in range(len(X_all)):
        test_vec = X_all[i]
        test_y = np.array(y_all[i])
        train_vecs = [v for j, v in enumerate(X_all) if j != i]
        train_coords = [c for j, c in enumerate(y_all) if j != i]
        pred = k_dtw_predict(
            test_vec,
            train_vecs,
            train_coords,
            k = K,
            max_warping_window = max_warp,
            subsample_step = subsample
        )
        err_pct = np.abs((pred - test_y) / test_y) * 100
        errs.append(err_pct)

    errs = np.vstack(errs)

    records.append({
        "K": K,
        "max_warping_window": max_warp,
        "subsample_step": subsample,
        "mean_error_X_%": errs[:,0].mean(),
        "mean_error_Z_%": errs[:,1].mean(),
        "mean_error_overall_%": errs.mean()
    })


  norm_series = (series - np.mean(series)) / np.std(series)
  norm_series = (series - np.mean(series)) / np.std(series)


In [1]:
results_df = pd.DataFrame(records)
results_df.to_csv("k_dtw_hyperparameter_results.csv", index = False)
print("Hyperparameter search done.")

results_df.sort_values("mean_error_overall_%").head()

NameError: name 'pd' is not defined

# RUN THE MODEL

In [None]:
# === Full Leave-One-Out Pipeline ===
def run_pipeline():

    # Loads all the sensor vectors and known leak locations.
    X_all, y_all, label_names = load_all_data()

    results = []

    # Loops through each sample and treats it like a test case (one at a time). 
    for i in range(len(X_all)):
        test_X = X_all[i]
        test_y = np.array(y_all[i])
        test_label = label_names[i]
        #print (test_X.shape(), test_y.shape())

        # The rest of the data is used as the training set. 
        train_X = [x for j, x in enumerate(X_all) if j != i]
        train_y = [y for j, y in enumerate(y_all) if j != i]
        print (train_X, train_y)
        # Uses k_dtw_predict() to predict the leak location
        pred = k_dtw_predict(test_X, train_X, train_y, k=K,
                             max_warping_window=MAX_WARPING_WINDOW,
                             subsample_step=SUBSAMPLE_STEP)

        # Calculate the error between predicted and actual [X, Z]. 
        error = abs((pred - test_y)/test_y)*100
        results.append({
            "Test": test_label,
            "True_X": test_y[0], "True_Z": test_y[1],
            "Pred_X": pred[0], "Pred_Z": pred[1],
            "Error": error
        })

    # Stores the result into a table
    return pd.DataFrame(results)

In [None]:
# === Full Leave-One-Out Pipeline ===
def run_pipeline2():

    # Loads all the sensor vectors and known leak locations.
    X_all, y_all, label_names = load_all_data()

    results = []

    # Loops through each sample and treats it like a test case (one at a time). 
    i=2
    test_X = X_all[i]
    test_y = np.array(y_all[i])
    test_label = label_names[i]
        #print (test_X.shape(), test_y.shape())

        # The rest of the data is used as the training set. 
    train_X = [x for j, x in enumerate(X_all) if j != i]
    train_y = [y for j, y in enumerate(y_all) if j != i]
    print (train_X, train_y)
        # Uses k_dtw_predict() to predict the leak location
    pred = k_dtw_predict(test_X, train_X, train_y, k=K,
                             max_warping_window=MAX_WARPING_WINDOW,
                             subsample_step=SUBSAMPLE_STEP)

        # Calculate the error between predicted and actual [X, Z]. 
    error = abs((pred - test_y)/test_y)*100
    results.append({
            "Test": test_label,
            "True_X": test_y[0], "True_Z": test_y[1],
            "Pred_X": pred[0], "Pred_Z": pred[1],
            "Error": error
        })

    # Stores the result into a table
    return pd.DataFrame(results)

In [None]:
# Run pipeline and show results
results_df = run_pipeline2()
results_df

In [None]:
# import numpy as np
# import pandas as pd

# def k_dtw_predict_weighted(test_vec, train_vecs, train_labels,
#                             k=2,
#                             max_warping_window=np.inf,
#                             subsample_step=1):
# # compute DTW distances to all training series
#     dists = []
#     for vec, label in zip(train_vecs, train_labels):
#         d = dtw_distance(test_vec, vec,
#                             max_warping_window=max_warping_window,
#                             subsample_step=subsample_step)
#         dists.append((d, label))
#         # sort and pick top-k
#         dists.sort(key=lambda x: x[0])
#         topk = dists[:k]
#         ds, labs = zip(*topk)
#         ds = np.array(ds) + 1e-8 # avoid division by zero
#         weights = 1.0 / ds # closer â†’ bigger weight
#         coords = np.array([LEAK_COORDS[l] for l in labs]) # shape (k,2)
#         # weighted average of [X,Z]
#         return (weights[:,None]*coords).sum(axis=0) / weights.sum()

# def run_weighted_pipeline(k=2):
#     X_all, y_all, labels = load_all_data()
#     records = []
#     for i in range(len(X_all)):
#         test_vec = X_all[i]
#         true_coord = np.array(y_all[i])
#         test_label = labels[i]

#         train_vecs = [X_all[j] for j in range(len(X_all)) if j!=i]
#         train_labels = [labels[j] for j in range(len(labels)) if j!=i]

#         pred = k_dtw_predict_weighted(
#             test_vec, train_vecs, train_labels,
#             k=k,
#             max_warping_window=MAX_WARPING_WINDOW,
#             subsample_step=SUBSAMPLE_STEP
#         )
#         err = np.linalg.norm(pred - true_coord)
#         records.append({
#             "Test": test_label,
#             "True_X": true_coord[0], "True_Z": true_coord[1],
#             "Pred_X": pred[0], "Pred_Z": pred[1],
#             "Error": err
#         })
#     return pd.DataFrame(records)

# # Try it:
# for k in (1,2,3):
#     print(f"\n>>> Weighted k-DTW with k={k}")
#     print(run_weighted_pipeline(k=k))

**Load Data -> load_all_data() = reads and prepares test vectors and known [X, Z]**

**DTW Comparison -> dtw_distance() = finds pattern similarity between sensor data**

**Predict location -> k_dtw_predict() = Uses the top-k similar vectors to estimate leak location**

**Evaluate results -> run_pipeline() = Tests each case one-by-one and calculates prediction error**

In [None]:
# Example: 
# test-2 -> leak at [-4.85, -4.32]
# test-3 -> leak at [-4.89, -4.18]
# test-4 -> leak at [-5.18, -4.32]

# If you are testing test-3, the code:
# - compares test-3's sensor pattern to test-2 and test-4. 
# - finds which one it is more similar to (using DTW)
# - Averages those [X, Z] values to predict where test-3's leak is
# - Compare the prediction to the real answer. 

In [None]:
# Ks = [1, 3, 5, 7, 9]
# windows = [30, 50, 70, 90, 120]
# steps = [2, 4, 6, 8, 10]
# ths = [0.005, 0.01, 0.02, 0.03, 0.04] 

# best = {"params": None, "error": float("inf")}

# for K_, W, S, T in product(Ks, windows, steps, ths):
#     K = K_
#     MAX_WARPING_WINDOW = W
#     SUBSAMPLE_STEP = S
#     FFT_THRESHOLD = T

#     def fft_smooth(signal, threshold=FFT_THRESHOLD):
#         fft_vals = fft(signal)
#         fft_vals[np.abs(fft_vals) < threshold * np.max(np.abs(fft_vals))] = 0
#         return np.real(ifft(fft_vals))

#     df = run_pipeline()
#     avg_err = df["Error"].mean()

#     if avg_err < best["error"]:
#         best["error"] = avg_err
#         best["params"] = (K_, W, S, T)

# print("Best Avg error: ", best["error"], "with", best["params"])

# PLOT GRAPH

In [None]:
import matplotlib.pyplot as plt

tests = results_df["Test"].tolist()
true_x = results_df["True_X"].tolist()
pred_x = results_df["Pred_X"].tolist()
true_z = results_df["True_Z"].tolist()
pred_z = results_df["Pred_Z"].tolist()

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12,4), sharey=False)
ax1.plot(tests, true_x, marker = "o", label= "True X", linestyle="None")
ax1.plot(tests, pred_x, marker = "s", label= "Predicted X", linestyle="None")


plt.ylim([-6,-3])

ax1.set_title("True vs. Predicted X")
ax1.set_xlabel("Test")
#ax1.set_ylabel("X Coordinate")
ax1.legend()
#ax1.grid(True)
ax1.set_xticks(tests)
ax1.set_xticklabels(tests, rotation=30, ha="right")

ax2.plot(tests, true_z, marker = "o", label="True Z", linestyle="None")
ax2.plot(tests, pred_z, marker = "s", label="Predicted Z", linestyle="None")




ax2.set_title("True vs. Predicted Z")
#ax1.set_xlabel("Test")
#ax1.set_ylabel("Z Coordinate")
ax2.legend()
#ax2.grid(True)
ax2.set_xticks(tests)
ax2.set_xticklabels(tests, rotation=30, ha="right")

ax1.set_ylim(-6, -3)
ax2.set_ylim(-6, -3)

plt.tight_layout()
plt.show()

In [None]:
if __name__ == "__main__":
    results_df= run_pipeline() 

    print("Final predicted coordinates:\n")
    for _, row in results_df.iterrows():
        test = row["Test"]
        pred_x = row["Pred_X"]
        pred_z = row["Pred_Z"]

        print(f"{test:>6} -> [X, Z] = [{pred_x:.6f}, {pred_z:.6f}]")

In [None]:
# import matplotlib.pyplot as plt
# import pandas as pd


# plt.figure(figsize = (6,6))
# plt.scatter(
#     results_df['True_X'],
#     results_df['True_Z'], 
#     marker='o',
#     color='blue',
#     label='True'
# )

# plt.scatter(
#     results_df['Pred_X'],
#     results_df['Pred_Z'],
#     marker = 'x',
#     color='red',
#     label='Predicted'
# )

# for _, row in results_df.iterrows():
#     plt.plot(
#         [row['True_X'], row['Pred_X']],
#         [row['True_Z'], row['Pred_Z']],
#         linestyle='--',
#         color='gray'
#     )

#     plt.text(
#         row['Pred_X'],
#         row['Pred_Z'],
#         row['Test'],
#         color='blue',
#         fontsize=9,
#         verticalalignment='bottom',
#         horizontalalignment = 'right'
#     )

#     plt.text(
#         row['Pred_X'],
#         row['Pred_Z'],
#         row['Test'],
#         color='red',
#         fontsize=9,
#         verticalalignment='top',
#         horizontalalignment='left'
#     )

# plt.xlabel('X') 
# plt.ylabel('Z')
# plt.title('True vs Predicted')
# plt.legend()
# plt.grid(True)
# plt.tight_layout()
# plt.show() 