#Tabular Prior-Data Fitted Network (TapPFN) for Structural Damage Prediction

In [None]:
# (if necessary, typically run once)
# !pip install tabpfn pandas numpy seaborn matplotlib scikit-learn xgboost

# Clone and install the repository for TabPFN +10 classes
# !pip install "tabpfn-extensions[all] @ git+https://github.com/PriorLabs/tabpfn-extensions.git"

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from tabpfn import TabPFNClassifier
import time
import pickle
import os
import torch # TabPFN uses torch

Setup device for TabPFN

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

Data loading and normalization

In [None]:
fname = join(getcwd(),'data3SS2009.mat')
mat_contents = sio.loadmat(fname)
dataset = mat_contents['dataset']
N, Chno, Nc = dataset.shape

print(f'''
Number of samples (N): {N}
Number of channels (Chno): {Chno}
Number of cases (Nc): {Nc}
''')

y = mat_contents['labels'].reshape(Nc)
y_target = y

for i in range(len(y_target)):
    y_target[i] -= 1


Ch1 = dataset[:,0,:] # load cell: shaker force
Ch2 = dataset[:,1,:] # accelerometer: base
Ch3 = dataset[:,2,:] # accelerometer: 1st floor
Ch4 = dataset[:,3,:] # accelerometer: 2nd floor
Ch5 = dataset[:,4,:] # accelerometer: 3rd floor

Ts = 3.125 * 1e-3 # sampling time
time = (np.linspace(1,N,N) - 1) * Ts

na = 30

FeatARCh2 = []
FeatARCh3 = []
FeatARCh4 = []
FeatARCh5 = []

for i in tqdm(range(Nc)):
  res1 = AutoReg(Ch2[:,i], lags = na, trend = 'n').fit()
  res2 = AutoReg(Ch3[:,i], lags = na, trend = 'n').fit()
  res3 = AutoReg(Ch4[:,i], lags = na, trend = 'n').fit()
  res4 = AutoReg(Ch5[:,i], lags = na, trend = 'n').fit()

  FeatARCh2.append(res1.params)
  FeatARCh3.append(res2.params)
  FeatARCh4.append(res3.params)
  FeatARCh5.append(res4.params)

FeatARCh2 = np.array(FeatARCh2)
FeatARCh3 = np.array(FeatARCh3)
FeatARCh4 = np.array(FeatARCh4)
FeatARCh5 = np.array(FeatARCh5)

X1 = np.concatenate([FeatARCh2,FeatARCh3,FeatARCh4,FeatARCh5], axis = 1)

scaler = preprocessing.MinMaxScaler(feature_range=(-1, 1))
indDam = y > 9
dfARred = pd.concat([pd.DataFrame(scaler.fit_transform(X1)), pd.DataFrame({'target':indDam})],axis=1)
dfARred.describe()

Data visualization

In [None]:
dfARred_for_plot = dfARred.copy()
dfARred_for_plot['target'] = dfARred_for_plot['target'].map({False: 'Undamaged', True: 'Damaged'})
num_features = len(dfARred_for_plot.columns) - 1
plt.figure(figsize=[20, 7])
ax_top = plt.gca()
pd.plotting.parallel_coordinates(
    dfARred_for_plot,
    'target',
    colormap=plt.get_cmap('Paired'),
    ax=ax_top,
    axvlines=False
)
ax_top.set_xticks(range(0, num_features, 10))
ax_top.set_xticklabels(ax_top.get_xticks(), rotation=90)
plt.xlabel('AR Coefficient Index')
plt.ylabel('Normalized Value')
plt.title('Parallel Coordinates: AR Coefficients by Damage State')
plt.tight_layout()
plt.show()
plt.figure(figsize=[15, 6])
plt.subplot(121)
ax_left = plt.gca()
pd.plotting.parallel_coordinates(
    dfARred_for_plot[dfARred['target'] == False],
    'target',
    colormap=plt.get_cmap('Paired'),
    ax=ax_left,
    axvlines=False
)
plt.xlabel('AR Coefficient Index')
plt.ylabel('Normalized Value')
plt.title('Parallel Coordinates: Undamaged Cases', y=1.02)

ax_left.set_xticks(range(0, num_features, 10))
ax_left.set_xticklabels(ax_left.get_xticks(), rotation=90)
plt.subplot(122)
ax_right = plt.gca()
pd.plotting.parallel_coordinates(
    dfARred_for_plot[dfARred['target'] == True],
    'target',
    colormap=plt.get_cmap('Paired'),
    ax=ax_right,
    axvlines=False
)
plt.xlabel('AR Coefficient Index')
plt.title('Parallel Coordinates: Damaged Cases', y=1.02)
ax_right.set_xticks(range(0, num_features, 10))
ax_right.set_xticklabels(ax_right.get_xticks(), rotation=90)
plt.tight_layout()
plt.show()

In [None]:
indTest = [200, 600, 800]
nSpect = len(indTest)
Ts = 3.125 * 1e-3
Fs = 1 / Ts
channel_groups_data = [
    ([Ch2, Ch3], ['Ch2 (Base)', 'Ch3 (1st Floor)']),
    ([Ch4, Ch5], ['Ch4 (2nd Floor)', 'Ch5 (3rd Floor)'])
]
for group_idx, (current_channels, current_channel_names) in enumerate(channel_groups_data):
    num_channels_in_group = len(current_channels)
    fig, axTuple = plt.subplots(nrows=nSpect, ncols=num_channels_in_group,
                                figsize=(20, 12),
                                sharex='col', sharey='row',
                                gridspec_kw={'wspace': 0.4, 'hspace': 0.4})
    suptitle_text = f'Spectrograms for {current_channel_names[0]} and {current_channel_names[1]}'
    fig.suptitle(suptitle_text, fontsize=18)
    ims_for_colorbars = [None] * num_channels_in_group
    for col_idx, (channel_data, channel_name) in enumerate(zip(current_channels, current_channel_names)):
        for row_idx, time_point_idx in enumerate(indTest):
            current_ax = axTuple[row_idx, col_idx]
            x = channel_data[:, time_point_idx]
            Pxx, freqs, bins, im = current_ax.specgram(x, Fs=Fs)
            if row_idx == nSpect - 1:
                ims_for_colorbars[col_idx] = im
            current_ax.set_title(f'{channel_name} - Time Point: {time_point_idx}', fontsize=12)
            current_ax.grid(True)
            if row_idx == nSpect - 1:
                current_ax.set_xlabel('Time [s]', fontsize=10)
            if col_idx == 0:
                current_ax.set_ylabel('Frequency [Hz]', fontsize=10)
            current_ax.tick_params(axis='both', which='major', labelsize=9)
    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    bottom_y_overall = axTuple[nSpect-1, 0].get_position().y0
    top_y_overall = axTuple[0, 0].get_position().y1
    height_overall_subplots = top_y_overall - bottom_y_overall
    cbar_ax0 = fig.add_axes([pos_col0_right_edge + 0.02, bottom_y_overall, 0.015, height_overall_subplots])
    cbar0 = fig.colorbar(ims_for_colorbars[0], cax=cbar_ax0)
    cbar0.set_label('Frequency [Hz]', fontsize=10)
    cbar_ax1 = fig.add_axes([pos_col1_right_edge + 0.02, bottom_y_overall, 0.015, height_overall_subplots])
    cbar1 = fig.colorbar(ims_for_colorbars[1], cax=cbar_ax1)
    cbar1.set_label('Frequency [Hz]', fontsize=10)
    plt.show()

Training and testing

In [None]:
quicktest = True
if quicktest:
    iterTest = 2
    kfolds, nk = (4, 7)
    n_search = 5
else:
    iterTest = 15
    kfolds, nk = (5, 10)
    n_search = 25

PredDF = pd.DataFrame(columns=["y", "yh", "feature", "model", "nholdout"])
ModelsDF = pd.DataFrame(columns=["feature", "model", "nbytes", "nholdout"])
TimeDF = pd.DataFrame(columns=["feature", "model", "inference_time", "nholdout"])
models = [
    ("TabPFN", ManyClassClassifier(
       estimator=TabPFNClassifier(device='cuda'),
       alphabet_size=10,
       n_estimators=15
    )),
    ("SVC", SVC(probability=True)),
    ("KNN", KNeighborsClassifier()),
    ("RandomForest", RandomForestClassifier()),
    ("XGBoost", XGBClassifier(use_label_encoder=False, eval_metric='mlogloss'))
]
param_grids = [
    None,  # TabPFN does not require hyperparameter adjustment
    {
       "SVC__C": stats.loguniform(1e-3, 1e3),
       "SVC__kernel": ["linear", "rbf", "poly"],
       "SVC__gamma": stats.loguniform(1e-4, 1e0)
    },
    {
        "KNN__n_neighbors": stats.randint(3, 20),
        "KNN__weights": ["uniform", "distance"]
    },
    {
        "RandomForest__n_estimators": stats.randint(50, 200),
        "RandomForest__max_depth": stats.randint(5, 30),
        "RandomForest__min_samples_split": stats.randint(2, 10)
    },
    {
        "XGBoost__n_estimators": stats.randint(50, 200),
        "XGBoost__max_depth": stats.randint(3, 10),
        "XGBoost__learning_rate": stats.uniform(0.01, 0.3)
    }
]

featList = [X1]
featName = ['X1']
Nfeats = len(featList)

for k, (model_name, model) in enumerate(models):
    for j in range(Nfeats):
        X = featList[j]
        y = y_target
        print(f"\nModel: {model_name}")
        for i in tqdm(range(iterTest)):
            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=0.5, stratify=y, random_state=i
            )
            rkf = RepeatedStratifiedKFold(n_splits=kfolds, n_repeats=nk, random_state=i)
            clf = Pipeline([("scaler", StandardScaler()), (model_name, model)])
            param_grid = param_grids[k]
            if param_grid:
                random_search = RandomizedSearchCV(
                    clf,
                    param_distributions=param_grid,
                    n_iter=n_search,
                    cv=rkf,
                    scoring="accuracy",
                    n_jobs=-1,
                    random_state=i
                )
                random_search.fit(X_train, y_train)
                best_model = random_search.best_estimator_
            else:
                clf.fit(X_train, y_train)
                best_model = clf
            start_time = time.time()
            yh_test = best_model.predict(X_test)
            inference_time = time.time() - start_time

            with open('temp_model.pickle', 'wb') as handle:
                pk.dump(best_model, handle, protocol=pk.HIGHEST_PROTOCOL)
            nbytes = os.path.getsize('temp_model.pickle')
            PredDF = pd.concat([PredDF, pd.DataFrame({
                "y": y_test,
                "yh": yh_test,
                "feature": featName[j],
                "model": model_name,
                "nholdout": i
            })], ignore_index=True)
            ModelsDF = pd.concat([ModelsDF, pd.DataFrame({
                "feature": [featName[j]],
                "model": [model_name],
                "nbytes": [nbytes],
                "nholdout": [i]
            })], ignore_index=True)
            TimeDF = pd.concat([TimeDF, pd.DataFrame({
                "feature": [featName[j]],
                "model": [model_name],
                "inference_time": [inference_time],
                "nholdout": [i]
            })], ignore_index=True)


with open(f"results.p", "wb") as fname:
    pk.dump([PredDF, ModelsDF, [name for name, _ in models], featName, TimeDF], fname)