<a href="https://colab.research.google.com/github/SahelKherad/3-story-benchmark-transformer/blob/main/Untitled36.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import os
import scipy.io
from scipy.io.matlab._mio5_params import mat_struct
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader

In [3]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [5]:
!ls "/content/drive/My Drive/ASCE"


shm01a.mat  shm03a.mat	shm05a.mat  shm07a.mat	shm09a.mat
shm02a.mat  shm04a.mat	shm06a.mat  shm08a.mat


In [6]:
import scipy.io

folder_path = '/content/drive/MyDrive/ASCE/shm01a.mat'  # adjust if your filename differs
data = scipy.io.loadmat(folder_path)

print(data.keys())

dict_keys(['__header__', '__version__', '__globals__', 'dasy', 'dasy_dscr', 'filedescription', 'fsdasy'])


In [9]:
def load_mat(path):
    # 1) load and squeeze singletons
    mat = scipy.io.loadmat(path, squeeze_me=True, struct_as_record=False)
    raw = mat.get('dasy')
    if raw is None:
        raise KeyError(f"'dasy' not found in {path}; keys: {list(mat.keys())}")

    # 2) if it came in as a 0-d object array, unpack it
    if isinstance(raw, np.ndarray) and raw.shape == () and isinstance(raw.item(), mat_struct):
        raw = raw.item()

    # 3) if it’s a mat_struct, each field is one channel’s data
    if isinstance(raw, mat_struct):
        fields = raw._fieldnames  # e.g. ['ch1', 'ch2', …]
        channels = []
        for f in fields:
            arr = getattr(raw, f)
            # arr may be shape (n,1) or (n,), so flatten
            channels.append(np.asarray(arr).reshape(-1))
        data = np.stack(channels, axis=1)  # → (n_samples, n_channels)

    # 4) otherwise if it’s already an ndarray, use it
    elif isinstance(raw, np.ndarray):
        data = raw

    else:
        raise ValueError(f"Cannot interpret dasy of type {type(raw)}")

    # 5) final sanity check
    if not (isinstance(data, np.ndarray) and data.ndim == 2):
        raise ValueError(f"After unwrapping, expected 2‐D array but got shape {getattr(data,'shape',None)}")

    return data, mat


# Now try your loading loop again:
base_dir = '/content/drive/MyDrive/ASCE/'
state_files = {
    1: 'shm01a.mat',   # state 1 → label 0
    2: 'shm02a.mat',   # state 3 → label 1
    3: 'shm03a.mat',   # state 3 → label 2
    4: 'shm04a.mat',   # state 3 → label 1
    5: 'shm05a.mat',   # state 3 → label 1
    6: 'shm06a.mat',   # state 3 → label 1
    7: 'shm07a.mat',   # state 3 → label 1
    8: 'shm08a.mat',   # state 3 → label 1
    9: 'shm09a.mat',   # state 3 → label 1
}



In [10]:
all_data, all_labels = [], []
for label, fname in state_files.items():
    path = os.path.join(base_dir, fname)
    dasy, mat = load_mat(path)
    print(f"{fname}: data shape = {dasy.shape}")
    all_data.append(dasy)
    all_labels.append(np.full(dasy.shape[0], label, dtype=np.int64))
all_labels2 = np.concatenate(all_labels)
print(all_labels2.shape)
print(all_labels[5].shape)
# print(all_data)

shm01a.mat: data shape = (60000, 16)
shm02a.mat: data shape = (60000, 16)
shm03a.mat: data shape = (60000, 16)
shm04a.mat: data shape = (60000, 16)
shm05a.mat: data shape = (60000, 16)
shm06a.mat: data shape = (45568, 16)
shm07a.mat: data shape = (180000, 16)
shm08a.mat: data shape = (180000, 16)
shm09a.mat: data shape = (180000, 16)
(885568,)
(45568,)


In [11]:
data_state1 = all_data[0]
data_state2 = all_data[1]
data_state3 = all_data[2]
data_state4 = all_data[3]
data_state5 = all_data[4]
data_state6 = all_data[5]
data_state7 = all_data[6]
data_state8 = all_data[7]
data_state9 = all_data[8]
# print(data_state1.dtype)
print(data_state1.shape)
print(data_state6.shape)
print(data_state8.shape)
# print(data_state2)

(60000, 16)
(45568, 16)
(180000, 16)


In [40]:
def segment_signal(x, L=1024, S=512):
    """
    x: (n_samples, n_channels)
    returns: np.ndarray of shape (n_windows, L, n_channels)
    """

    n_samples, n_channels = x.shape

    n_windows = int(np.floor((n_samples - L) / S) + 1)
    windows = []
    for start in range(0, n_samples - L + 1, S):
        windows.append(x[start:start+L, 1])
    return np.stack(windows, axis=0)

# apply to each state
seg1 = segment_signal(data_state1, L=1024, S=512)
seg3 = segment_signal(data_state3, L=1024, S=512)

print("State1 windows:", seg1.shape)  # e.g. (≈117, 1024, 15)
print("State3 windows:", seg3.shape)


State1 windows: (116, 1024)
State3 windows: (116, 1024)


In [41]:
X = np.vstack([seg1, seg3])
y = np.concatenate([
    np.zeros(len(seg1), dtype=np.int64),   # label 0 for state1
    np.ones(len(seg3),  dtype=np.int64),   # label 1 for state3
])

print("X shape:", X.shape)
print("y shape:", y.shape, "unique labels:", np.unique(y))


X shape: (232, 1024)
y shape: (232,) unique labels: [0 1]


In [59]:
if not isinstance(X, torch.Tensor):
    X = torch.from_numpy(X).float()
if not isinstance(y, torch.Tensor):
    y = torch.from_numpy(y)

# instantiate and wrap in loader
dataset = (X, y)
loader  = DataLoader(dataset, batch_size=32, shuffle=True)

# xb, yb = next(iter(loader))
print("Batch X:", xb.shape)  # (32, 1024, 15)
print("Batch y:", yb.shape)  # (32,)


Batch X: torch.Size([32, 1024])
Batch y: torch.Size([32])


In [52]:
import torch.nn as nn

class TimeSeriesTransformer(nn.Module):
    def __init__(self, n_channels=1, d_model=64, n_heads=4, num_layers=3, num_classes=2):
        super().__init__()
        # 1) per-step embedding
        self.input_proj = nn.Linear(n_channels, d_model)
        # 2) positional encoding
        self.pos_enc = nn.Parameter(torch.randn(1, 1024, d_model))
        # 3) transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=n_heads, dim_feedforward=4*d_model, dropout=0.1
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        # 4) classification head
        self.classifier = nn.Sequential(
            nn.Linear(d_model, d_model//2),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(d_model//2, num_classes),
        )

    def forward(self, x):
        # x: (batch, L, C)
        x = self.input_proj(x)                 # → (batch, L, d_model)
        x = x + self.pos_enc[:, :x.size(1)]    # add positional embedding
        x = x.permute(1, 0, 2)                 # → (L, batch, d_model) for Transformer
        x = self.transformer(x)                # → (L, batch, d_model)
        x = x.mean(dim=0)                      # global-average over time → (batch, d_model)
        return self.classifier(x)              # → (batch, num_classes)


In [55]:
model = TimeSeriesTransformer()



In [56]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-2)
